22
22
import org .apache .logging .log4j .Logger ;
23
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
24
24
import org .elasticsearch .ElasticsearchException ;
25
- import org .elasticsearch .Version ;
25
+ import org .elasticsearch .ExceptionsHelper ;
26
+ import org .elasticsearch .ResourceAlreadyExistsException ;
26
27
import org .elasticsearch .action .ActionListener ;
28
+ import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
29
+ import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
27
30
import org .elasticsearch .action .bulk .BackoffPolicy ;
28
31
import org .elasticsearch .action .index .IndexRequestBuilder ;
29
32
import org .elasticsearch .action .index .IndexResponse ;
30
33
import org .elasticsearch .client .Client ;
31
34
import org .elasticsearch .client .OriginSettingClient ;
32
35
import org .elasticsearch .client .Requests ;
36
+ import org .elasticsearch .cluster .ClusterState ;
33
37
import org .elasticsearch .cluster .metadata .IndexMetadata ;
38
+ import org .elasticsearch .cluster .metadata .MappingMetadata ;
39
+ import org .elasticsearch .cluster .service .ClusterService ;
34
40
import org .elasticsearch .common .inject .Inject ;
35
41
import org .elasticsearch .common .settings .Settings ;
36
42
import org .elasticsearch .common .unit .TimeValue ;
37
43
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
38
44
import org .elasticsearch .common .xcontent .ToXContent ;
39
45
import org .elasticsearch .common .xcontent .XContentBuilder ;
40
46
import org .elasticsearch .common .xcontent .XContentFactory ;
41
- import org .elasticsearch .indices .SystemIndexDescriptor ;
47
+ import org .elasticsearch .common .xcontent .XContentType ;
48
+ import org .elasticsearch .core .internal .io .Streams ;
42
49
import org .elasticsearch .threadpool .ThreadPool ;
43
50
51
+ import java .io .ByteArrayOutputStream ;
44
52
import java .io .IOException ;
53
+ import java .io .InputStream ;
54
+ import java .nio .charset .StandardCharsets ;
45
55
import java .util .Iterator ;
56
+ import java .util .Map ;
46
57
47
58
import static org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskAction .TASKS_ORIGIN ;
48
59
import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
49
- import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
50
60
51
61
/**
52
62
* Service that can store task results.
@@ -55,20 +65,15 @@ public class TaskResultsService {
55
65
56
66
private static final Logger logger = LogManager .getLogger (TaskResultsService .class );
57
67
58
- public static final String TASK_TYPE = "task" ;
59
68
public static final String TASK_INDEX = ".tasks" ;
69
+
70
+ public static final String TASK_TYPE = "task" ;
71
+
72
+ public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json" ;
73
+
60
74
public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version" ;
61
75
62
- public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor .builder ()
63
- .setIndexPattern (TASK_INDEX + "*" )
64
- .setPrimaryIndex (TASK_INDEX )
65
- .setDescription ("Task Result Index" )
66
- .setSettings (getTaskResultIndexSettings ())
67
- .setMappings (getTaskResultIndexMappings ())
68
- .setVersionMetaKey (TASK_RESULT_MAPPING_VERSION_META_FIELD )
69
- .setOrigin (TASKS_ORIGIN )
70
- .setIndexType (TASK_TYPE )
71
- .build ();
76
+ public static final int TASK_RESULT_MAPPING_VERSION = 3 ;
72
77
73
78
/**
74
79
* The backoff policy to use when saving a task result fails. The total wait
@@ -79,16 +84,76 @@ public class TaskResultsService {
79
84
80
85
private final Client client ;
81
86
87
+ private final ClusterService clusterService ;
88
+
82
89
private final ThreadPool threadPool ;
83
90
84
91
@ Inject
85
- public TaskResultsService (Client client , ThreadPool threadPool ) {
92
+ public TaskResultsService (Client client , ClusterService clusterService , ThreadPool threadPool ) {
86
93
this .client = new OriginSettingClient (client , TASKS_ORIGIN );
94
+ this .clusterService = clusterService ;
87
95
this .threadPool = threadPool ;
88
96
}
89
97
90
98
public void storeResult (TaskResult taskResult , ActionListener <Void > listener ) {
91
- IndexRequestBuilder index = client .prepareIndex (TASK_INDEX , TASK_TYPE ).setId (taskResult .getTask ().getTaskId ().toString ());
99
+
100
+ ClusterState state = clusterService .state ();
101
+
102
+ if (state .routingTable ().hasIndex (TASK_INDEX ) == false ) {
103
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest ();
104
+ createIndexRequest .settings (taskResultIndexSettings ());
105
+ createIndexRequest .index (TASK_INDEX );
106
+ createIndexRequest .mapping (TASK_TYPE , taskResultIndexMapping (), XContentType .JSON );
107
+ createIndexRequest .cause ("auto(task api)" );
108
+
109
+ client .admin ().indices ().create (createIndexRequest , new ActionListener <CreateIndexResponse >() {
110
+ @ Override
111
+ public void onResponse (CreateIndexResponse result ) {
112
+ doStoreResult (taskResult , listener );
113
+ }
114
+
115
+ @ Override
116
+ public void onFailure (Exception e ) {
117
+ if (ExceptionsHelper .unwrapCause (e ) instanceof ResourceAlreadyExistsException ) {
118
+ // we have the index, do it
119
+ try {
120
+ doStoreResult (taskResult , listener );
121
+ } catch (Exception inner ) {
122
+ inner .addSuppressed (e );
123
+ listener .onFailure (inner );
124
+ }
125
+ } else {
126
+ listener .onFailure (e );
127
+ }
128
+ }
129
+ });
130
+ } else {
131
+ IndexMetadata metadata = state .getMetadata ().index (TASK_INDEX );
132
+ if (getTaskResultMappingVersion (metadata ) < TASK_RESULT_MAPPING_VERSION ) {
133
+ // The index already exists but doesn't have our mapping
134
+ client .admin ().indices ().preparePutMapping (TASK_INDEX ).setType (TASK_TYPE )
135
+ .setSource (taskResultIndexMapping (), XContentType .JSON )
136
+ .execute (ActionListener .delegateFailure (listener , (l , r ) -> doStoreResult (taskResult , listener )));
137
+ } else {
138
+ doStoreResult (taskResult , listener );
139
+ }
140
+ }
141
+ }
142
+
143
+ private int getTaskResultMappingVersion (IndexMetadata metadata ) {
144
+ MappingMetadata mappingMetadata = metadata .getMappings ().get (TASK_TYPE );
145
+ if (mappingMetadata == null ) {
146
+ return 0 ;
147
+ }
148
+ @ SuppressWarnings ("unchecked" ) Map <String , Object > meta = (Map <String , Object >) mappingMetadata .sourceAsMap ().get ("_meta" );
149
+ if (meta == null || meta .containsKey (TASK_RESULT_MAPPING_VERSION_META_FIELD ) == false ) {
150
+ return 1 ; // The mapping was created before meta field was introduced
151
+ }
152
+ return (int ) meta .get (TASK_RESULT_MAPPING_VERSION_META_FIELD );
153
+ }
154
+
155
+ private void doStoreResult (TaskResult taskResult , ActionListener <Void > listener ) {
156
+ IndexRequestBuilder index = client .prepareIndex (TASK_INDEX , TASK_TYPE , taskResult .getTask ().getTaskId ().toString ());
92
157
try (XContentBuilder builder = XContentFactory .contentBuilder (Requests .INDEX_CONTENT_TYPE )) {
93
158
taskResult .toXContent (builder , ToXContent .EMPTY_PARAMS );
94
159
index .setSource (builder );
@@ -119,106 +184,24 @@ public void onFailure(Exception e) {
119
184
});
120
185
}
121
186
122
- private static Settings getTaskResultIndexSettings () {
187
+ private Settings taskResultIndexSettings () {
123
188
return Settings .builder ()
124
189
.put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 )
125
190
.put (IndexMetadata .INDEX_AUTO_EXPAND_REPLICAS_SETTING .getKey (), "0-1" )
126
191
.put (IndexMetadata .SETTING_PRIORITY , Integer .MAX_VALUE )
127
192
.build ();
128
193
}
129
194
130
- private static XContentBuilder getTaskResultIndexMappings () {
131
- try {
132
- final XContentBuilder builder = jsonBuilder ();
133
-
134
- builder .startObject ();
135
- {
136
- builder .startObject (TASK_TYPE );
137
- builder .field ("dynamic" , "strict" );
138
- {
139
- builder .startObject ("_meta" );
140
- builder .field (TASK_RESULT_MAPPING_VERSION_META_FIELD , Version .CURRENT .toString ());
141
- builder .endObject ();
142
-
143
- builder .startObject ("properties" );
144
- {
145
- builder .startObject ("completed" );
146
- builder .field ("type" , "boolean" );
147
- builder .endObject ();
148
-
149
- builder .startObject ("task" );
150
- {
151
- builder .startObject ("properties" );
152
- {
153
- builder .startObject ("action" );
154
- builder .field ("type" , "keyword" );
155
- builder .endObject ();
156
-
157
- builder .startObject ("cancellable" );
158
- builder .field ("type" , "boolean" );
159
- builder .endObject ();
160
-
161
- builder .startObject ("id" );
162
- builder .field ("type" , "long" );
163
- builder .endObject ();
164
-
165
- builder .startObject ("parent_task_id" );
166
- builder .field ("type" , "keyword" );
167
- builder .endObject ();
168
-
169
- builder .startObject ("node" );
170
- builder .field ("type" , "keyword" );
171
- builder .endObject ();
172
-
173
- builder .startObject ("running_time_in_nanos" );
174
- builder .field ("type" , "long" );
175
- builder .endObject ();
176
-
177
- builder .startObject ("start_time_in_millis" );
178
- builder .field ("type" , "long" );
179
- builder .endObject ();
180
-
181
- builder .startObject ("type" );
182
- builder .field ("type" , "keyword" );
183
- builder .endObject ();
184
-
185
- builder .startObject ("status" );
186
- builder .field ("type" , "object" );
187
- builder .field ("enabled" , false );
188
- builder .endObject ();
189
-
190
- builder .startObject ("description" );
191
- builder .field ("type" , "text" );
192
- builder .endObject ();
193
-
194
- builder .startObject ("headers" );
195
- builder .field ("type" , "object" );
196
- builder .field ("enabled" , false );
197
- builder .endObject ();
198
- }
199
- builder .endObject ();
200
- }
201
- builder .endObject ();
202
-
203
- builder .startObject ("response" );
204
- builder .field ("type" , "object" );
205
- builder .field ("enabled" , false );
206
- builder .endObject ();
207
-
208
- builder .startObject ("error" );
209
- builder .field ("type" , "object" );
210
- builder .field ("enabled" , false );
211
- builder .endObject ();
212
- }
213
- builder .endObject ();
214
- }
215
- builder .endObject ();
216
- }
217
-
218
- builder .endObject ();
219
- return builder ;
220
- } catch (IOException e ) {
221
- throw new RuntimeException ("Failed to build " + TASK_INDEX + " index mappings" , e );
195
+ public String taskResultIndexMapping () {
196
+ try (InputStream is = getClass ().getResourceAsStream (TASK_RESULT_INDEX_MAPPING_FILE )) {
197
+ ByteArrayOutputStream out = new ByteArrayOutputStream ();
198
+ Streams .copy (is , out );
199
+ return out .toString (StandardCharsets .UTF_8 .name ());
200
+ } catch (Exception e ) {
201
+ logger .error (() -> new ParameterizedMessage (
202
+ "failed to create tasks results index template [{}]" , TASK_RESULT_INDEX_MAPPING_FILE ), e );
203
+ throw new IllegalStateException ("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]" , e );
222
204
}
205
+
223
206
}
224
207
}
0 commit comments