Skip to content

Commit 3e014d3

Browse files
authored
[Transform] fail to start/put on missing pipeline (#50701) (#50795)
If a pipeline referenced by a transform does not exist, we should not allow the transform to be created. We do allow the pipeline existence check to be skipped with defer_validations, but if the pipeline still does not exist on `_start`, the pipeline will fail to start. relates: #50135
1 parent 4f150e4 commit 3e014d3

File tree

10 files changed

+173
-12
lines changed

10 files changed

+173
-12
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public void testPutTransform() throws IOException, InterruptedException {
134134
.setIndex("pivot-destination")
135135
.setPipeline("my-pipeline").build();
136136
// end::put-transform-dest-config
137+
destConfig = DestConfig.builder().setIndex("pivot-destination").build();
137138
// tag::put-transform-group-config
138139
GroupConfig groupConfig = GroupConfig.builder()
139140
.groupBy("reviewer", // <1>

docs/build.gradle

+17-1
Original file line numberDiff line numberDiff line change
@@ -1226,7 +1226,23 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = '''
12261226
number_of_shards: 1
12271227
number_of_replicas: 0
12281228
'''
1229-
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + '''
1229+
buildRestTests.setups['add_timestamp_pipeline'] = '''
1230+
- do:
1231+
ingest.put_pipeline:
1232+
id: "add_timestamp_pipeline"
1233+
body: >
1234+
{
1235+
"processors": [
1236+
{
1237+
"set" : {
1238+
"field" : "@timestamp",
1239+
"value" : "{{_ingest.timestamp}}"
1240+
}
1241+
}
1242+
]
1243+
}
1244+
'''
1245+
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + buildRestTests.setups['add_timestamp_pipeline'] + '''
12301246
- do:
12311247
raw:
12321248
method: PUT

docs/reference/transform/apis/put-transform.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ PUT _transform/ecommerce_transform
195195
}
196196
}
197197
--------------------------------------------------
198-
// TEST[setup:kibana_sample_data_ecommerce]
198+
// TEST[setup:kibana_sample_data_ecommerce,add_timestamp_pipeline]
199199

200200
When the {transform} is created, you receive the following results:
201201

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class TransformMessages {
2727
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
2828
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
2929
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
30+
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
3031

3132
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
3233

x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml

+49
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,22 @@ setup:
122122
}
123123
- match: { acknowledged: true }
124124

125+
- do:
126+
ingest.put_pipeline:
127+
id: "airline-pipeline"
128+
body: >
129+
{
130+
"processors": [
131+
{
132+
"set" : {
133+
"field" : "some_field",
134+
"value" : 42
135+
}
136+
}
137+
]
138+
}
139+
- match: { acknowledged: true }
140+
125141
- do:
126142
transform.put_transform:
127143
transform_id: "airline-transform-dos"
@@ -631,3 +647,36 @@ setup:
631647
transform_id: "airline-transform-start-delete"
632648
force: true
633649
- match: { acknowledged: true }
650+
---
651+
"Test put transform with missing pipeline":
652+
- do:
653+
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
654+
transform.put_transform:
655+
transform_id: "airline-transform-with-missing-pipeline-crud"
656+
body: >
657+
{
658+
"source": { "index": "airline-data" },
659+
"dest": { "index": "airline-data-by-airline-with-pipeline", "pipeline": "missing-transform-pipeline" },
660+
"pivot": {
661+
"group_by": { "airline": {"terms": {"field": "airline"}}},
662+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
663+
},
664+
"description": "yaml test transform on airline-data"
665+
}
666+
---
667+
"Test put transform with missing pipeline and defer validations":
668+
- do:
669+
transform.put_transform:
670+
defer_validation: true
671+
transform_id: "airline-transform-with-missing-pipeline-crud-defer"
672+
body: >
673+
{
674+
"source": { "index": "airline-data" },
675+
"dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" },
676+
"pivot": {
677+
"group_by": { "airline": {"terms": {"field": "airline"}}},
678+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
679+
},
680+
"description": "yaml test transform on airline-data"
681+
}
682+
- match: {acknowledged: true}

x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml

+56
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,59 @@ teardown:
376376
index: airline-data-time-alias
377377
- match: { airline-data-time-alias.mappings.properties.time.type: date }
378378
- match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
379+
---
380+
"Test start transform with missing pipeline":
381+
- do:
382+
transform.put_transform:
383+
defer_validation: true
384+
transform_id: "airline-transform-with-missing-pipeline"
385+
body: >
386+
{
387+
"source": { "index": "airline-data" },
388+
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "missing-transform-pipeline" },
389+
"pivot": {
390+
"group_by": { "airline": {"terms": {"field": "airline"}}},
391+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
392+
},
393+
"description": "yaml test transform on airline-data"
394+
}
395+
- match: {acknowledged: true}
396+
- do:
397+
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
398+
transform.start_transform:
399+
transform_id: "airline-transform-with-missing-pipeline"
400+
---
401+
"Test start transform with pipeline":
402+
- do:
403+
ingest.put_pipeline:
404+
id: "transform-pipeline"
405+
body: >
406+
{
407+
"processors": [
408+
{
409+
"set" : {
410+
"field" : "some_field",
411+
"value" : 42
412+
}
413+
}
414+
]
415+
}
416+
- match: { acknowledged: true }
417+
418+
- do:
419+
transform.put_transform:
420+
transform_id: "airline-transform-with-pipeline"
421+
body: >
422+
{
423+
"source": { "index": "airline-data" },
424+
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "transform-pipeline" },
425+
"pivot": {
426+
"group_by": { "airline": {"terms": {"field": "airline"}}},
427+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
428+
},
429+
"description": "yaml test transform on airline-data"
430+
}
431+
- match: {acknowledged: true}
432+
- do:
433+
transform.start_transform:
434+
transform_id: "airline-transform-with-pipeline"

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.inject.Inject;
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.license.License;
3031
import org.elasticsearch.license.LicenseUtils;
3132
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -73,6 +74,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
7374
private final SecurityContext securityContext;
7475
private final TransformAuditor auditor;
7576
private final SourceDestValidator sourceDestValidator;
77+
private final IngestService ingestService;
7678

7779
@Inject
7880
public TransportPutTransformAction(
@@ -84,7 +86,8 @@ public TransportPutTransformAction(
8486
ClusterService clusterService,
8587
XPackLicenseState licenseState,
8688
TransformServices transformServices,
87-
Client client
89+
Client client,
90+
IngestService ingestService
8891
) {
8992
this(
9093
PutTransformAction.NAME,
@@ -96,7 +99,8 @@ public TransportPutTransformAction(
9699
clusterService,
97100
licenseState,
98101
transformServices,
99-
client
102+
client,
103+
ingestService
100104
);
101105
}
102106

@@ -110,7 +114,8 @@ protected TransportPutTransformAction(
110114
ClusterService clusterService,
111115
XPackLicenseState licenseState,
112116
TransformServices transformServices,
113-
Client client
117+
Client client,
118+
IngestService ingestService
114119
) {
115120
super(
116121
name,
@@ -137,6 +142,7 @@ protected TransportPutTransformAction(
137142
clusterService.getNodeName(),
138143
License.OperationMode.BASIC.description()
139144
);
145+
this.ingestService = ingestService;
140146
}
141147

142148
static HasPrivilegesRequest buildPrivilegeCheck(
@@ -335,6 +341,16 @@ private void putTransform(Request request, ActionListener<AcknowledgedResponse>
335341
if (request.isDeferValidation()) {
336342
pivotValidationListener.onResponse(true);
337343
} else {
344+
if (config.getDestination().getPipeline() != null) {
345+
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
346+
listener.onFailure(new ElasticsearchStatusException(
347+
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
348+
RestStatus.BAD_REQUEST
349+
)
350+
);
351+
return;
352+
}
353+
}
338354
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
339355
}
340356
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.license.License;
3031
import org.elasticsearch.license.LicenseUtils;
3132
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -70,6 +71,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
7071
private final Client client;
7172
private final TransformAuditor auditor;
7273
private final SourceDestValidator sourceDestValidator;
74+
private final IngestService ingestService;
7375

7476
@Inject
7577
public TransportStartTransformAction(
@@ -82,7 +84,8 @@ public TransportStartTransformAction(
8284
TransformServices transformServices,
8385
PersistentTasksService persistentTasksService,
8486
Client client,
85-
Settings settings
87+
Settings settings,
88+
IngestService ingestService
8689
) {
8790
this(
8891
StartTransformAction.NAME,
@@ -95,7 +98,8 @@ public TransportStartTransformAction(
9598
transformServices,
9699
persistentTasksService,
97100
client,
98-
settings
101+
settings,
102+
ingestService
99103
);
100104
}
101105

@@ -110,7 +114,8 @@ protected TransportStartTransformAction(
110114
TransformServices transformServices,
111115
PersistentTasksService persistentTasksService,
112116
Client client,
113-
Settings settings
117+
Settings settings,
118+
IngestService ingestService
114119
) {
115120
super(
116121
name,
@@ -135,6 +140,7 @@ protected TransportStartTransformAction(
135140
clusterService.getNodeName(),
136141
License.OperationMode.BASIC.description()
137142
);
143+
this.ingestService = ingestService;
138144
}
139145

140146
@Override
@@ -256,6 +262,16 @@ protected void masterOperation(
256262
}
257263
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
258264
transformConfigHolder.set(config);
265+
if (config.getDestination().getPipeline() != null) {
266+
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
267+
listener.onFailure(new ElasticsearchStatusException(
268+
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
269+
RestStatus.BAD_REQUEST
270+
)
271+
);
272+
return;
273+
}
274+
}
259275

260276
sourceDestValidator.validate(
261277
clusterService.state(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPutTransformActionDeprecated.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.inject.Inject;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.ingest.IngestService;
1516
import org.elasticsearch.license.XPackLicenseState;
1617
import org.elasticsearch.threadpool.ThreadPool;
1718
import org.elasticsearch.transport.TransportService;
@@ -31,7 +32,8 @@ public TransportPutTransformActionDeprecated(
3132
ClusterService clusterService,
3233
XPackLicenseState licenseState,
3334
TransformServices transformServices,
34-
Client client
35+
Client client,
36+
IngestService ingestService
3537
) {
3638
super(
3739
PutTransformActionDeprecated.NAME,
@@ -43,7 +45,8 @@ public TransportPutTransformActionDeprecated(
4345
clusterService,
4446
licenseState,
4547
transformServices,
46-
client
48+
client,
49+
ingestService
4750
);
4851
}
4952

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.inject.Inject;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.ingest.IngestService;
1516
import org.elasticsearch.license.XPackLicenseState;
1617
import org.elasticsearch.persistent.PersistentTasksService;
1718
import org.elasticsearch.threadpool.ThreadPool;
@@ -33,7 +34,8 @@ public TransportStartTransformActionDeprecated(
3334
TransformServices transformServices,
3435
PersistentTasksService persistentTasksService,
3536
Client client,
36-
Settings settings
37+
Settings settings,
38+
IngestService ingestService
3739
) {
3840
super(
3941
StartTransformActionDeprecated.NAME,
@@ -46,7 +48,8 @@ public TransportStartTransformActionDeprecated(
4648
transformServices,
4749
persistentTasksService,
4850
client,
49-
settings
51+
settings,
52+
ingestService
5053
);
5154
}
5255
}

0 commit comments

Comments
 (0)