Skip to content

Commit 207525b

Browse files
authored
[Transform] fail to start/put on missing pipeline (#50701)
If a pipeline referenced by a transform does not exist, we should not allow the transform to be created. We do allow the pipeline existence check to be skipped with defer_validations, but if the pipeline still does not exist on `_start`, the pipeline will fail to start. relates: #50135
1 parent ee6fbcc commit 207525b

File tree

10 files changed

+173
-12
lines changed

10 files changed

+173
-12
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public void testPutTransform() throws IOException, InterruptedException {
134134
.setIndex("pivot-destination")
135135
.setPipeline("my-pipeline").build();
136136
// end::put-transform-dest-config
137+
destConfig = DestConfig.builder().setIndex("pivot-destination").build();
137138
// tag::put-transform-group-config
138139
GroupConfig groupConfig = GroupConfig.builder()
139140
.groupBy("reviewer", // <1>

docs/build.gradle

+17-1
Original file line numberDiff line numberDiff line change
@@ -1235,7 +1235,23 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = '''
12351235
number_of_shards: 1
12361236
number_of_replicas: 0
12371237
'''
1238-
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + '''
1238+
buildRestTests.setups['add_timestamp_pipeline'] = '''
1239+
- do:
1240+
ingest.put_pipeline:
1241+
id: "add_timestamp_pipeline"
1242+
body: >
1243+
{
1244+
"processors": [
1245+
{
1246+
"set" : {
1247+
"field" : "@timestamp",
1248+
"value" : "{{_ingest.timestamp}}"
1249+
}
1250+
}
1251+
]
1252+
}
1253+
'''
1254+
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + buildRestTests.setups['add_timestamp_pipeline'] + '''
12391255
- do:
12401256
raw:
12411257
method: PUT

docs/reference/transform/apis/put-transform.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ PUT _transform/ecommerce_transform
195195
}
196196
}
197197
--------------------------------------------------
198-
// TEST[setup:kibana_sample_data_ecommerce]
198+
// TEST[setup:kibana_sample_data_ecommerce,add_timestamp_pipeline]
199199

200200
When the {transform} is created, you receive the following results:
201201

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class TransformMessages {
2727
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
2828
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
2929
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
30+
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
3031

3132
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
3233

x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml

+49
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,22 @@ setup:
122122
}
123123
- match: { acknowledged: true }
124124

125+
- do:
126+
ingest.put_pipeline:
127+
id: "airline-pipeline"
128+
body: >
129+
{
130+
"processors": [
131+
{
132+
"set" : {
133+
"field" : "some_field",
134+
"value" : 42
135+
}
136+
}
137+
]
138+
}
139+
- match: { acknowledged: true }
140+
125141
- do:
126142
transform.put_transform:
127143
transform_id: "airline-transform-dos"
@@ -631,3 +647,36 @@ setup:
631647
transform_id: "airline-transform-start-delete"
632648
force: true
633649
- match: { acknowledged: true }
650+
---
651+
"Test put transform with missing pipeline":
652+
- do:
653+
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
654+
transform.put_transform:
655+
transform_id: "airline-transform-with-missing-pipeline-crud"
656+
body: >
657+
{
658+
"source": { "index": "airline-data" },
659+
"dest": { "index": "airline-data-by-airline-with-pipeline", "pipeline": "missing-transform-pipeline" },
660+
"pivot": {
661+
"group_by": { "airline": {"terms": {"field": "airline"}}},
662+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
663+
},
664+
"description": "yaml test transform on airline-data"
665+
}
666+
---
667+
"Test put transform with missing pipeline and defer validations":
668+
- do:
669+
transform.put_transform:
670+
defer_validation: true
671+
transform_id: "airline-transform-with-missing-pipeline-crud-defer"
672+
body: >
673+
{
674+
"source": { "index": "airline-data" },
675+
"dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" },
676+
"pivot": {
677+
"group_by": { "airline": {"terms": {"field": "airline"}}},
678+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
679+
},
680+
"description": "yaml test transform on airline-data"
681+
}
682+
- match: {acknowledged: true}

x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml

+56
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,59 @@ teardown:
376376
index: airline-data-time-alias
377377
- match: { airline-data-time-alias.mappings.properties.time.type: date }
378378
- match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
379+
---
380+
"Test start transform with missing pipeline":
381+
- do:
382+
transform.put_transform:
383+
defer_validation: true
384+
transform_id: "airline-transform-with-missing-pipeline"
385+
body: >
386+
{
387+
"source": { "index": "airline-data" },
388+
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "missing-transform-pipeline" },
389+
"pivot": {
390+
"group_by": { "airline": {"terms": {"field": "airline"}}},
391+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
392+
},
393+
"description": "yaml test transform on airline-data"
394+
}
395+
- match: {acknowledged: true}
396+
- do:
397+
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
398+
transform.start_transform:
399+
transform_id: "airline-transform-with-missing-pipeline"
400+
---
401+
"Test start transform with pipeline":
402+
- do:
403+
ingest.put_pipeline:
404+
id: "transform-pipeline"
405+
body: >
406+
{
407+
"processors": [
408+
{
409+
"set" : {
410+
"field" : "some_field",
411+
"value" : 42
412+
}
413+
}
414+
]
415+
}
416+
- match: { acknowledged: true }
417+
418+
- do:
419+
transform.put_transform:
420+
transform_id: "airline-transform-with-pipeline"
421+
body: >
422+
{
423+
"source": { "index": "airline-data" },
424+
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "transform-pipeline" },
425+
"pivot": {
426+
"group_by": { "airline": {"terms": {"field": "airline"}}},
427+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
428+
},
429+
"description": "yaml test transform on airline-data"
430+
}
431+
- match: {acknowledged: true}
432+
- do:
433+
transform.start_transform:
434+
transform_id: "airline-transform-with-pipeline"

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.inject.Inject;
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.license.License;
3031
import org.elasticsearch.license.LicenseUtils;
3132
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -74,6 +75,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
7475
private final SecurityContext securityContext;
7576
private final TransformAuditor auditor;
7677
private final SourceDestValidator sourceDestValidator;
78+
private final IngestService ingestService;
7779

7880
@Inject
7981
public TransportPutTransformAction(
@@ -85,7 +87,8 @@ public TransportPutTransformAction(
8587
ClusterService clusterService,
8688
XPackLicenseState licenseState,
8789
TransformServices transformServices,
88-
Client client
90+
Client client,
91+
IngestService ingestService
8992
) {
9093
this(
9194
PutTransformAction.NAME,
@@ -97,7 +100,8 @@ public TransportPutTransformAction(
97100
clusterService,
98101
licenseState,
99102
transformServices,
100-
client
103+
client,
104+
ingestService
101105
);
102106
}
103107

@@ -111,7 +115,8 @@ protected TransportPutTransformAction(
111115
ClusterService clusterService,
112116
XPackLicenseState licenseState,
113117
TransformServices transformServices,
114-
Client client
118+
Client client,
119+
IngestService ingestService
115120
) {
116121
super(
117122
name,
@@ -138,6 +143,7 @@ protected TransportPutTransformAction(
138143
clusterService.getNodeName(),
139144
License.OperationMode.BASIC.description()
140145
);
146+
this.ingestService = ingestService;
141147
}
142148

143149
static HasPrivilegesRequest buildPrivilegeCheck(
@@ -335,6 +341,16 @@ private void putTransform(Request request, ActionListener<AcknowledgedResponse>
335341
if (request.isDeferValidation()) {
336342
pivotValidationListener.onResponse(true);
337343
} else {
344+
if (config.getDestination().getPipeline() != null) {
345+
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
346+
listener.onFailure(new ElasticsearchStatusException(
347+
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
348+
RestStatus.BAD_REQUEST
349+
)
350+
);
351+
return;
352+
}
353+
}
338354
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
339355
}
340356
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.license.License;
3031
import org.elasticsearch.license.LicenseUtils;
3132
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -71,6 +72,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
7172
private final Client client;
7273
private final TransformAuditor auditor;
7374
private final SourceDestValidator sourceDestValidator;
75+
private final IngestService ingestService;
7476

7577
@Inject
7678
public TransportStartTransformAction(
@@ -83,7 +85,8 @@ public TransportStartTransformAction(
8385
TransformServices transformServices,
8486
PersistentTasksService persistentTasksService,
8587
Client client,
86-
Settings settings
88+
Settings settings,
89+
IngestService ingestService
8790
) {
8891
this(
8992
StartTransformAction.NAME,
@@ -96,7 +99,8 @@ public TransportStartTransformAction(
9699
transformServices,
97100
persistentTasksService,
98101
client,
99-
settings
102+
settings,
103+
ingestService
100104
);
101105
}
102106

@@ -111,7 +115,8 @@ protected TransportStartTransformAction(
111115
TransformServices transformServices,
112116
PersistentTasksService persistentTasksService,
113117
Client client,
114-
Settings settings
118+
Settings settings,
119+
IngestService ingestService
115120
) {
116121
super(
117122
name,
@@ -136,6 +141,7 @@ protected TransportStartTransformAction(
136141
clusterService.getNodeName(),
137142
License.OperationMode.BASIC.description()
138143
);
144+
this.ingestService = ingestService;
139145
}
140146

141147
@Override
@@ -258,6 +264,16 @@ protected void masterOperation(
258264
}
259265
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
260266
transformConfigHolder.set(config);
267+
if (config.getDestination().getPipeline() != null) {
268+
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
269+
listener.onFailure(new ElasticsearchStatusException(
270+
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
271+
RestStatus.BAD_REQUEST
272+
)
273+
);
274+
return;
275+
}
276+
}
261277

262278
sourceDestValidator.validate(
263279
clusterService.state(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPutTransformActionDeprecated.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.inject.Inject;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.ingest.IngestService;
1516
import org.elasticsearch.license.XPackLicenseState;
1617
import org.elasticsearch.threadpool.ThreadPool;
1718
import org.elasticsearch.transport.TransportService;
@@ -31,7 +32,8 @@ public TransportPutTransformActionDeprecated(
3132
ClusterService clusterService,
3233
XPackLicenseState licenseState,
3334
TransformServices transformServices,
34-
Client client
35+
Client client,
36+
IngestService ingestService
3537
) {
3638
super(
3739
PutTransformActionDeprecated.NAME,
@@ -43,7 +45,8 @@ public TransportPutTransformActionDeprecated(
4345
clusterService,
4446
licenseState,
4547
transformServices,
46-
client
48+
client,
49+
ingestService
4750
);
4851
}
4952

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.inject.Inject;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.ingest.IngestService;
1516
import org.elasticsearch.license.XPackLicenseState;
1617
import org.elasticsearch.persistent.PersistentTasksService;
1718
import org.elasticsearch.threadpool.ThreadPool;
@@ -33,7 +34,8 @@ public TransportStartTransformActionDeprecated(
3334
TransformServices transformServices,
3435
PersistentTasksService persistentTasksService,
3536
Client client,
36-
Settings settings
37+
Settings settings,
38+
IngestService ingestService
3739
) {
3840
super(
3941
StartTransformActionDeprecated.NAME,
@@ -46,7 +48,8 @@ public TransportStartTransformActionDeprecated(
4648
transformServices,
4749
persistentTasksService,
4850
client,
49-
settings
51+
settings,
52+
ingestService
5053
);
5154
}
5255
}

0 commit comments

Comments
 (0)