@@ -183,7 +183,7 @@ protected void masterOperation(
183
183
}, listener ::onFailure );
184
184
185
185
// <4> Create the task in cluster state so that it will start executing on the node
186
- ActionListener <Void > createOrGetIndexListener = ActionListener .wrap (unused -> {
186
+ ActionListener <Boolean > createOrGetIndexListener = ActionListener .wrap (unused -> {
187
187
TransformTaskParams transformTask = transformTaskHolder .get ();
188
188
assert transformTask != null ;
189
189
PersistentTasksCustomMetaData .PersistentTask <TransformTaskParams > existingTask = getExistingTask (transformTask .getId (), state );
@@ -224,8 +224,10 @@ protected void masterOperation(
224
224
String [] dest = indexNameExpressionResolver .concreteIndexNames (state , IndicesOptions .lenientExpandOpen (), destinationIndex );
225
225
226
226
if (dest .length == 0 ) {
227
- auditor .info (request .getId (), "Creating destination index [" + destinationIndex + "] with deduced mappings." );
228
- createDestinationIndex (transformConfigHolder .get (), createOrGetIndexListener );
227
+ createDestinationIndex (transformConfigHolder .get (), ActionListener .wrap (r -> {
228
+ auditor .info (request .getId (), "Created destination index [" + destinationIndex + "] with deduced mappings." );
229
+ createOrGetIndexListener .onResponse (r );
230
+ }, createOrGetIndexListener ::onFailure ));
229
231
} else {
230
232
auditor .info (request .getId (), "Using existing destination index [" + destinationIndex + "]." );
231
233
ClientHelper .executeAsyncWithOrigin (
@@ -240,12 +242,12 @@ protected void masterOperation(
240
242
"Non-empty destination index [" + destinationIndex + "]. " + "Contains [" + docTotal + "] total documents."
241
243
);
242
244
}
243
- createOrGetIndexListener .onResponse (null );
245
+ createOrGetIndexListener .onResponse (true );
244
246
}, e -> {
245
247
String msg = "Unable to determine destination index stats, error: " + e .getMessage ();
246
- logger .error (msg , e );
248
+ logger .warn (msg , e );
247
249
auditor .warning (request .getId (), msg );
248
- createOrGetIndexListener .onResponse (null );
250
+ createOrGetIndexListener .onResponse (true );
249
251
}),
250
252
client .admin ().indices ()::stats
251
253
);
@@ -267,9 +269,10 @@ protected void masterOperation(
267
269
transformConfigHolder .set (config );
268
270
if (config .getDestination ().getPipeline () != null ) {
269
271
if (ingestService .getPipeline (config .getDestination ().getPipeline ()) == null ) {
270
- listener .onFailure (new ElasticsearchStatusException (
271
- TransformMessages .getMessage (TransformMessages .PIPELINE_MISSING , config .getDestination ().getPipeline ()),
272
- RestStatus .BAD_REQUEST
272
+ listener .onFailure (
273
+ new ElasticsearchStatusException (
274
+ TransformMessages .getMessage (TransformMessages .PIPELINE_MISSING , config .getDestination ().getPipeline ()),
275
+ RestStatus .BAD_REQUEST
273
276
)
274
277
);
275
278
return ;
@@ -289,18 +292,12 @@ protected void masterOperation(
289
292
transformConfigManager .getTransformConfiguration (request .getId (), getTransformListener );
290
293
}
291
294
292
- private void createDestinationIndex (final TransformConfig config , final ActionListener <Void > listener ) {
295
+ private void createDestinationIndex (final TransformConfig config , final ActionListener <Boolean > listener ) {
293
296
294
297
final Pivot pivot = new Pivot (config .getPivotConfig ());
295
298
296
299
ActionListener <Map <String , String >> deduceMappingsListener = ActionListener .wrap (
297
- mappings -> TransformIndex .createDestinationIndex (
298
- client ,
299
- Clock .systemUTC (),
300
- config ,
301
- mappings ,
302
- ActionListener .wrap (r -> listener .onResponse (null ), listener ::onFailure )
303
- ),
300
+ mappings -> TransformIndex .createDestinationIndex (client , Clock .systemUTC (), config , mappings , listener ),
304
301
deduceTargetMappingsException -> listener .onFailure (
305
302
new RuntimeException (TransformMessages .REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS , deduceTargetMappingsException )
306
303
)
0 commit comments