11
11
import org .apache .logging .log4j .Logger ;
12
12
import org .apache .logging .log4j .message .ParameterizedMessage ;
13
13
import org .elasticsearch .ElasticsearchException ;
14
- import org .elasticsearch .ExceptionsHelper ;
15
- import org .elasticsearch .ResourceAlreadyExistsException ;
14
+ import org .elasticsearch .Version ;
16
15
import org .elasticsearch .action .ActionListener ;
17
- import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
18
- import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
19
16
import org .elasticsearch .action .bulk .BackoffPolicy ;
20
17
import org .elasticsearch .action .index .IndexRequestBuilder ;
21
18
import org .elasticsearch .action .index .IndexResponse ;
22
19
import org .elasticsearch .client .Client ;
23
20
import org .elasticsearch .client .OriginSettingClient ;
24
21
import org .elasticsearch .client .Requests ;
25
- import org .elasticsearch .cluster .ClusterState ;
26
22
import org .elasticsearch .cluster .metadata .IndexMetadata ;
27
- import org .elasticsearch .cluster .metadata .MappingMetadata ;
28
- import org .elasticsearch .cluster .service .ClusterService ;
29
23
import org .elasticsearch .common .inject .Inject ;
30
24
import org .elasticsearch .common .settings .Settings ;
31
25
import org .elasticsearch .common .unit .TimeValue ;
32
26
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
33
27
import org .elasticsearch .common .xcontent .ToXContent ;
34
28
import org .elasticsearch .common .xcontent .XContentBuilder ;
35
29
import org .elasticsearch .common .xcontent .XContentFactory ;
36
- import org .elasticsearch .common .xcontent .XContentType ;
37
- import org .elasticsearch .core .internal .io .Streams ;
30
+ import org .elasticsearch .indices .SystemIndexDescriptor ;
38
31
import org .elasticsearch .threadpool .ThreadPool ;
39
32
40
- import java .io .ByteArrayOutputStream ;
41
33
import java .io .IOException ;
42
- import java .io .InputStream ;
43
- import java .nio .charset .StandardCharsets ;
34
+ import java .io .UncheckedIOException ;
44
35
import java .util .Iterator ;
45
- import java .util .Map ;
46
36
47
37
import static org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskAction .TASKS_ORIGIN ;
48
38
import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
39
+ import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
49
40
50
41
/**
51
42
* Service that can store task results.
@@ -55,12 +46,17 @@ public class TaskResultsService {
55
46
private static final Logger logger = LogManager .getLogger (TaskResultsService .class );
56
47
57
48
public static final String TASK_INDEX = ".tasks" ;
58
-
59
- public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json" ;
60
-
61
49
public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version" ;
62
50
63
- public static final int TASK_RESULT_MAPPING_VERSION = 3 ;
51
+ public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor .builder ()
52
+ .setIndexPattern (TASK_INDEX + "*" )
53
+ .setPrimaryIndex (TASK_INDEX )
54
+ .setDescription ("Task Result Index" )
55
+ .setSettings (getTaskResultIndexSettings ())
56
+ .setMappings (getTaskResultIndexMappings ())
57
+ .setVersionMetaKey (TASK_RESULT_MAPPING_VERSION_META_FIELD )
58
+ .setOrigin (TASKS_ORIGIN )
59
+ .build ();
64
60
65
61
/**
66
62
* The backoff policy to use when saving a task result fails. The total wait
@@ -71,75 +67,15 @@ public class TaskResultsService {
71
67
72
68
private final Client client ;
73
69
74
- private final ClusterService clusterService ;
75
-
76
70
private final ThreadPool threadPool ;
77
71
78
72
@ Inject
79
- public TaskResultsService (Client client , ClusterService clusterService , ThreadPool threadPool ) {
73
+ public TaskResultsService (Client client , ThreadPool threadPool ) {
80
74
this .client = new OriginSettingClient (client , TASKS_ORIGIN );
81
- this .clusterService = clusterService ;
82
75
this .threadPool = threadPool ;
83
76
}
84
77
85
78
public void storeResult (TaskResult taskResult , ActionListener <Void > listener ) {
86
-
87
- ClusterState state = clusterService .state ();
88
-
89
- if (state .routingTable ().hasIndex (TASK_INDEX ) == false ) {
90
- CreateIndexRequest createIndexRequest = new CreateIndexRequest ();
91
- createIndexRequest .settings (taskResultIndexSettings ());
92
- createIndexRequest .index (TASK_INDEX );
93
- createIndexRequest .mapping (taskResultIndexMapping ());
94
- createIndexRequest .cause ("auto(task api)" );
95
-
96
- client .admin ().indices ().create (createIndexRequest , new ActionListener <CreateIndexResponse >() {
97
- @ Override
98
- public void onResponse (CreateIndexResponse result ) {
99
- doStoreResult (taskResult , listener );
100
- }
101
-
102
- @ Override
103
- public void onFailure (Exception e ) {
104
- if (ExceptionsHelper .unwrapCause (e ) instanceof ResourceAlreadyExistsException ) {
105
- // we have the index, do it
106
- try {
107
- doStoreResult (taskResult , listener );
108
- } catch (Exception inner ) {
109
- inner .addSuppressed (e );
110
- listener .onFailure (inner );
111
- }
112
- } else {
113
- listener .onFailure (e );
114
- }
115
- }
116
- });
117
- } else {
118
- IndexMetadata metadata = state .getMetadata ().index (TASK_INDEX );
119
- if (getTaskResultMappingVersion (metadata ) < TASK_RESULT_MAPPING_VERSION ) {
120
- // The index already exists but doesn't have our mapping
121
- client .admin ().indices ().preparePutMapping (TASK_INDEX )
122
- .setSource (taskResultIndexMapping (), XContentType .JSON )
123
- .execute (ActionListener .delegateFailure (listener , (l , r ) -> doStoreResult (taskResult , listener )));
124
- } else {
125
- doStoreResult (taskResult , listener );
126
- }
127
- }
128
- }
129
-
130
- private int getTaskResultMappingVersion (IndexMetadata metadata ) {
131
- MappingMetadata mappingMetadata = metadata .mapping ();
132
- if (mappingMetadata == null ) {
133
- return 0 ;
134
- }
135
- @ SuppressWarnings ("unchecked" ) Map <String , Object > meta = (Map <String , Object >) mappingMetadata .sourceAsMap ().get ("_meta" );
136
- if (meta == null || meta .containsKey (TASK_RESULT_MAPPING_VERSION_META_FIELD ) == false ) {
137
- return 1 ; // The mapping was created before meta field was introduced
138
- }
139
- return (int ) meta .get (TASK_RESULT_MAPPING_VERSION_META_FIELD );
140
- }
141
-
142
- private void doStoreResult (TaskResult taskResult , ActionListener <Void > listener ) {
143
79
IndexRequestBuilder index = client .prepareIndex (TASK_INDEX ).setId (taskResult .getTask ().getTaskId ().toString ());
144
80
try (XContentBuilder builder = XContentFactory .contentBuilder (Requests .INDEX_CONTENT_TYPE )) {
145
81
taskResult .toXContent (builder , ToXContent .EMPTY_PARAMS );
@@ -171,24 +107,102 @@ public void onFailure(Exception e) {
171
107
});
172
108
}
173
109
174
- private Settings taskResultIndexSettings () {
110
+ private static Settings getTaskResultIndexSettings () {
175
111
return Settings .builder ()
176
112
.put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 )
177
113
.put (IndexMetadata .INDEX_AUTO_EXPAND_REPLICAS_SETTING .getKey (), "0-1" )
178
114
.put (IndexMetadata .SETTING_PRIORITY , Integer .MAX_VALUE )
179
115
.build ();
180
116
}
181
117
182
- public String taskResultIndexMapping () {
183
- try (InputStream is = getClass ().getResourceAsStream (TASK_RESULT_INDEX_MAPPING_FILE )) {
184
- ByteArrayOutputStream out = new ByteArrayOutputStream ();
185
- Streams .copy (is , out );
186
- return out .toString (StandardCharsets .UTF_8 .name ());
187
- } catch (Exception e ) {
188
- logger .error (() -> new ParameterizedMessage (
189
- "failed to create tasks results index template [{}]" , TASK_RESULT_INDEX_MAPPING_FILE ), e );
190
- throw new IllegalStateException ("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]" , e );
191
- }
118
+ private static XContentBuilder getTaskResultIndexMappings () {
119
+ try {
120
+ final XContentBuilder builder = jsonBuilder ();
121
+
122
+ builder .startObject ();
123
+ {
124
+ builder .startObject ("_meta" );
125
+ builder .field (TASK_RESULT_MAPPING_VERSION_META_FIELD , Version .CURRENT .toString ());
126
+ builder .endObject ();
127
+
128
+ builder .field ("dynamic" , "strict" );
129
+ builder .startObject ("properties" );
130
+ {
131
+ builder .startObject ("completed" );
132
+ builder .field ("type" , "boolean" );
133
+ builder .endObject ();
134
+
135
+ builder .startObject ("task" );
136
+ {
137
+ builder .startObject ("properties" );
138
+ {
139
+ builder .startObject ("action" );
140
+ builder .field ("type" , "keyword" );
141
+ builder .endObject ();
142
+
143
+ builder .startObject ("cancellable" );
144
+ builder .field ("type" , "boolean" );
145
+ builder .endObject ();
146
+
147
+ builder .startObject ("id" );
148
+ builder .field ("type" , "long" );
149
+ builder .endObject ();
150
+
151
+ builder .startObject ("parent_task_id" );
152
+ builder .field ("type" , "keyword" );
153
+ builder .endObject ();
154
+
155
+ builder .startObject ("node" );
156
+ builder .field ("type" , "keyword" );
157
+ builder .endObject ();
158
+
159
+ builder .startObject ("running_time_in_nanos" );
160
+ builder .field ("type" , "long" );
161
+ builder .endObject ();
162
+
163
+ builder .startObject ("start_time_in_millis" );
164
+ builder .field ("type" , "long" );
165
+ builder .endObject ();
166
+
167
+ builder .startObject ("type" );
168
+ builder .field ("type" , "keyword" );
169
+ builder .endObject ();
170
+
171
+ builder .startObject ("status" );
172
+ builder .field ("type" , "object" );
173
+ builder .field ("enabled" , false );
174
+ builder .endObject ();
175
+
176
+ builder .startObject ("description" );
177
+ builder .field ("type" , "text" );
178
+ builder .endObject ();
179
+
180
+ builder .startObject ("headers" );
181
+ builder .field ("type" , "object" );
182
+ builder .field ("enabled" , false );
183
+ builder .endObject ();
184
+ }
185
+ builder .endObject ();
186
+ }
187
+ builder .endObject ();
188
+
189
+ builder .startObject ("response" );
190
+ builder .field ("type" , "object" );
191
+ builder .field ("enabled" , false );
192
+ builder .endObject ();
192
193
194
+ builder .startObject ("error" );
195
+ builder .field ("type" , "object" );
196
+ builder .field ("enabled" , false );
197
+ builder .endObject ();
198
+ }
199
+ builder .endObject ();
200
+ }
201
+
202
+ builder .endObject ();
203
+ return builder ;
204
+ } catch (IOException e ) {
205
+ throw new UncheckedIOException ("Failed to build " + TASK_INDEX + " index mappings" , e );
206
+ }
193
207
}
194
208
}
0 commit comments