Skip to content

Commit e37e5df

Browse files
authored
ingest: support simulate with verbose for pipeline processor (#33839)
* ingest: support simulate with verbose for pipeline processor This change better supports the use of simulate?verbose with the pipeline processor. Prior to this change any pipeline processors executed with simulate?verbose would not show all intermediate processors for the inner pipelines. This changes also moves the PipelineProcess and TrackingResultProcessor classes to enable instance checks and to avoid overly public classes. As well this updates the error message for when cycles are detected in pipelines calling other pipelines.
1 parent 019aead commit e37e5df

File tree

10 files changed

+509
-173
lines changed

10 files changed

+509
-173
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.unit.TimeValue;
3232
import org.elasticsearch.grok.Grok;
3333
import org.elasticsearch.grok.ThreadWatchdog;
34+
import org.elasticsearch.ingest.PipelineProcessor;
3435
import org.elasticsearch.ingest.Processor;
3536
import org.elasticsearch.plugins.ActionPlugin;
3637
import org.elasticsearch.plugins.IngestPlugin;

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,4 @@ teardown:
110110
pipeline: "outer"
111111
body: {}
112112
- match: { error.root_cause.0.type: "exception" }
113-
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }
113+
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" }

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,3 +605,150 @@ teardown:
605605
- length: { docs.0.processor_results.1: 2 }
606606
- match: { docs.0.processor_results.1.tag: "rename-1" }
607607
- match: { docs.0.processor_results.1.doc._source.new_status: 200 }
608+
609+
---
610+
"Test verbose simulate with Pipeline Processor with Circular Pipelines":
611+
- do:
612+
ingest.put_pipeline:
613+
id: "outer"
614+
body: >
615+
{
616+
"description" : "outer pipeline",
617+
"processors" : [
618+
{
619+
"pipeline" : {
620+
"pipeline": "inner"
621+
}
622+
}
623+
]
624+
}
625+
- match: { acknowledged: true }
626+
627+
- do:
628+
ingest.put_pipeline:
629+
id: "inner"
630+
body: >
631+
{
632+
"description" : "inner pipeline",
633+
"processors" : [
634+
{
635+
"pipeline" : {
636+
"pipeline": "outer"
637+
}
638+
}
639+
]
640+
}
641+
- match: { acknowledged: true }
642+
643+
- do:
644+
catch: /illegal_state_exception/
645+
ingest.simulate:
646+
verbose: true
647+
body: >
648+
{
649+
"pipeline": {
650+
"processors" : [
651+
{
652+
"pipeline" : {
653+
"pipeline": "outer"
654+
}
655+
}
656+
]
657+
}
658+
,
659+
"docs": [
660+
{
661+
"_index": "index",
662+
"_type": "type",
663+
"_id": "id",
664+
"_source": {
665+
"field1": "123.42 400 <foo>"
666+
}
667+
}
668+
]
669+
}
670+
- match: { error.root_cause.0.type: "illegal_state_exception" }
671+
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" }
672+
673+
---
674+
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
675+
- do:
676+
ingest.put_pipeline:
677+
id: "pipeline1"
678+
body: >
679+
{
680+
"processors": [
681+
{
682+
"set": {
683+
"field": "pipeline1",
684+
"value": true
685+
}
686+
},
687+
{
688+
"pipeline": {
689+
"pipeline": "pipeline2"
690+
}
691+
}
692+
]
693+
}
694+
- match: { acknowledged: true }
695+
696+
- do:
697+
ingest.put_pipeline:
698+
id: "pipeline2"
699+
body: >
700+
{
701+
"processors": [
702+
{
703+
"set": {
704+
"field": "pipeline2",
705+
"value": true
706+
}
707+
}
708+
]
709+
}
710+
- match: { acknowledged: true }
711+
712+
- do:
713+
ingest.simulate:
714+
verbose: true
715+
body: >
716+
{
717+
"pipeline": {
718+
"processors": [
719+
{
720+
"set": {
721+
"field": "pipeline0",
722+
"value": true
723+
}
724+
},
725+
{
726+
"pipeline": {
727+
"pipeline": "pipeline1"
728+
}
729+
}
730+
]
731+
},
732+
"docs": [
733+
{
734+
"_index": "index",
735+
"_type": "type",
736+
"_id": "id",
737+
"_source": {
738+
"field1": "123.42 400 <foo>"
739+
}
740+
}
741+
]
742+
}
743+
- length: { docs: 1 }
744+
- length: { docs.0.processor_results: 3 }
745+
- match: { docs.0.processor_results.0.doc._source.pipeline0: true }
746+
- is_false: docs.0.processor_results.0.doc._source.pipeline1
747+
- is_false: docs.0.processor_results.0.doc._source.pipeline2
748+
- match: { docs.0.processor_results.1.doc._source.pipeline0: true }
749+
- match: { docs.0.processor_results.1.doc._source.pipeline1: true }
750+
- is_false: docs.0.processor_results.1.doc._source.pipeline2
751+
- match: { docs.0.processor_results.2.doc._source.pipeline0: true }
752+
- match: { docs.0.processor_results.2.doc._source.pipeline1: true }
753+
- match: { docs.0.processor_results.2.doc._source.pipeline2: true }
754+

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,16 @@
2424
import org.elasticsearch.ingest.IngestDocument;
2525
import org.elasticsearch.ingest.Pipeline;
2626
import org.elasticsearch.ingest.CompoundProcessor;
27+
import org.elasticsearch.ingest.PipelineProcessor;
2728
import org.elasticsearch.threadpool.ThreadPool;
2829

2930
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.IdentityHashMap;
3033
import java.util.List;
34+
import java.util.Set;
3135

32-
import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
36+
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
3337

3438
class SimulateExecutionService {
3539

@@ -42,11 +46,15 @@ class SimulateExecutionService {
4246
}
4347

4448
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
49+
// Prevent cycles in pipeline decoration
50+
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
4551
if (verbose) {
4652
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
47-
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
53+
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
4854
try {
49-
verbosePipelineProcessor.execute(ingestDocument);
55+
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
56+
verbosePipelineProcessor);
57+
ingestDocument.executePipeline(verbosePipeline);
5058
return new SimulateDocumentVerboseResult(processorResultList);
5159
} catch (Exception e) {
5260
return new SimulateDocumentVerboseResult(processorResultList);

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ private static Object deepCopy(Object value) {
647647
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
648648
try {
649649
if (this.executedPipelines.add(pipeline) == false) {
650-
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
650+
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
651651
}
652652
return pipeline.execute(this);
653653
} finally {
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,9 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.ingest.common;
20+
package org.elasticsearch.ingest;
2121

2222
import java.util.Map;
23-
import org.elasticsearch.ingest.AbstractProcessor;
24-
import org.elasticsearch.ingest.ConfigurationUtils;
25-
import org.elasticsearch.ingest.IngestDocument;
26-
import org.elasticsearch.ingest.IngestService;
27-
import org.elasticsearch.ingest.Pipeline;
28-
import org.elasticsearch.ingest.Processor;
2923

3024
public class PipelineProcessor extends AbstractProcessor {
3125

@@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
5044
return ingestDocument.executePipeline(pipeline);
5145
}
5246

47+
Pipeline getPipeline(){
48+
return ingestService.getPipeline(pipelineName);
49+
}
50+
5351
@Override
5452
public String getType() {
5553
return TYPE;

server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java renamed to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.action.ingest;
20+
package org.elasticsearch.ingest;
2121

22-
import org.elasticsearch.ingest.CompoundProcessor;
23-
import org.elasticsearch.ingest.IngestDocument;
24-
import org.elasticsearch.ingest.Processor;
22+
import org.elasticsearch.action.ingest.SimulateProcessorResult;
2523

2624
import java.util.ArrayList;
2725
import java.util.List;
26+
import java.util.Set;
2827

2928
/**
3029
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
@@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor {
3534
private final List<SimulateProcessorResult> processorResultList;
3635
private final boolean ignoreFailure;
3736

38-
public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
37+
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
3938
this.ignoreFailure = ignoreFailure;
4039
this.processorResultList = processorResultList;
4140
this.actualProcessor = actualProcessor;
@@ -67,19 +66,35 @@ public String getTag() {
6766
return actualProcessor.getTag();
6867
}
6968

70-
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
69+
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
70+
Set<PipelineProcessor> pipelinesSeen) {
7171
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
7272
for (Processor processor : compoundProcessor.getProcessors()) {
73-
if (processor instanceof CompoundProcessor) {
74-
processors.add(decorate((CompoundProcessor) processor, processorResultList));
73+
if (processor instanceof PipelineProcessor) {
74+
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
75+
if (pipelinesSeen.add(pipelineProcessor) == false) {
76+
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
77+
}
78+
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
79+
pipelinesSeen.remove(pipelineProcessor);
80+
} else if (processor instanceof CompoundProcessor) {
81+
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
7582
} else {
7683
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
7784
}
7885
}
7986
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
8087
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
81-
if (processor instanceof CompoundProcessor) {
82-
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
88+
if (processor instanceof PipelineProcessor) {
89+
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
90+
if (pipelinesSeen.add(pipelineProcessor) == false) {
91+
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
92+
}
93+
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
94+
pipelinesSeen));
95+
pipelinesSeen.remove(pipelineProcessor);
96+
} else if (processor instanceof CompoundProcessor) {
97+
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
8398
} else {
8499
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
85100
}

0 commit comments

Comments
 (0)