Skip to content

Commit bed5e4a

Browse files
INGEST: Enable default pipelines (#32286) (#32591)
* INGEST: Enable default pipelines * Add `default_pipeline` index setting * `_none` is interpreted as no pipeline * closes #21101
1 parent 376079a commit bed5e4a

File tree

11 files changed

+200
-23
lines changed

11 files changed

+200
-23
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test index with default pipeline":
10+
- do:
11+
ingest.put_pipeline:
12+
id: "my_pipeline"
13+
body: >
14+
{
15+
"description": "_description",
16+
"processors": [
17+
{
18+
"bytes" : {
19+
"field" : "bytes_source_field",
20+
"target_field" : "bytes_target_field"
21+
}
22+
}
23+
]
24+
}
25+
- match: { acknowledged: true }
26+
27+
- do:
28+
indices.create:
29+
index: test
30+
body:
31+
settings:
32+
index:
33+
default_pipeline: "my_pipeline"
34+
35+
- do:
36+
index:
37+
index: test
38+
type: test
39+
id: 1
40+
body: {bytes_source_field: "1kb"}
41+
42+
- do:
43+
get:
44+
index: test
45+
type: test
46+
id: 1
47+
- match: { _source.bytes_source_field: "1kb" }
48+
- match: { _source.bytes_target_field: 1024 }
49+
50+
- do:
51+
index:
52+
index: test
53+
type: test
54+
id: 2
55+
pipeline: "_none"
56+
body: {bytes_source_field: "1kb"}
57+
58+
- do:
59+
get:
60+
index: test
61+
type: test
62+
id: 2
63+
- match: { _source.bytes_source_field: "1kb" }
64+
- is_false: _source.bytes_target_field
65+
66+
- do:
67+
catch: bad_request
68+
index:
69+
index: test
70+
type: test
71+
id: 3
72+
pipeline: ""
73+
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

-16
Original file line numberDiff line numberDiff line change
@@ -544,22 +544,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
544544
return -1;
545545
}
546546

547-
/**
548-
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
549-
*/
550-
public boolean hasIndexRequestsWithPipelines() {
551-
for (DocWriteRequest actionRequest : requests) {
552-
if (actionRequest instanceof IndexRequest) {
553-
IndexRequest indexRequest = (IndexRequest) actionRequest;
554-
if (Strings.hasText(indexRequest.getPipeline())) {
555-
return true;
556-
}
557-
}
558-
}
559-
560-
return false;
561-
}
562-
563547
@Override
564548
public ActionRequestValidationException validate() {
565549
ActionRequestValidationException validationException = null;

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@
4747
import org.elasticsearch.cluster.metadata.MappingMetaData;
4848
import org.elasticsearch.cluster.metadata.MetaData;
4949
import org.elasticsearch.cluster.service.ClusterService;
50+
import org.elasticsearch.common.collect.ImmutableOpenMap;
5051
import org.elasticsearch.common.inject.Inject;
5152
import org.elasticsearch.common.settings.Settings;
5253
import org.elasticsearch.common.unit.TimeValue;
5354
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5455
import org.elasticsearch.common.util.concurrent.AtomicArray;
5556
import org.elasticsearch.index.Index;
5657
import org.elasticsearch.index.IndexNotFoundException;
58+
import org.elasticsearch.index.IndexSettings;
5759
import org.elasticsearch.index.VersionType;
5860
import org.elasticsearch.index.shard.ShardId;
5961
import org.elasticsearch.indices.IndexClosedException;
@@ -129,7 +131,29 @@ protected final void doExecute(final BulkRequest bulkRequest, final ActionListen
129131

130132
@Override
131133
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
132-
if (bulkRequest.hasIndexRequestsWithPipelines()) {
134+
boolean hasIndexRequestsWithPipelines = false;
135+
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
136+
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
137+
if (actionRequest instanceof IndexRequest) {
138+
IndexRequest indexRequest = (IndexRequest) actionRequest;
139+
String pipeline = indexRequest.getPipeline();
140+
if (pipeline == null) {
141+
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
142+
if (indexMetaData == null) {
143+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
144+
} else {
145+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
146+
indexRequest.setPipeline(defaultPipeline);
147+
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
148+
hasIndexRequestsWithPipelines = true;
149+
}
150+
}
151+
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
152+
hasIndexRequestsWithPipelines = true;
153+
}
154+
}
155+
}
156+
if (hasIndexRequestsWithPipelines) {
133157
if (clusterService.localNode().isIngestNode()) {
134158
processBulkIndexIngestRequest(task, bulkRequest, listener);
135159
} else {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

+4
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ public ActionRequestValidationException validate() {
187187
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
188188
}
189189

190+
if (pipeline != null && pipeline.isEmpty()) {
191+
validationException = addValidationError("pipeline cannot be an empty string", validationException);
192+
}
193+
190194
return validationException;
191195
}
192196

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
153153
EngineConfig.INDEX_CODEC_SETTING,
154154
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
155155
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
156+
IndexSettings.DEFAULT_PIPELINE,
156157
// validate that built-in similarities don't get redefined
157158
Setting.groupSetting("index.similarity.", (s) -> {
158159
Map<String, Settings> groups = s.getAsGroups();

server/src/main/java/org/elasticsearch/index/IndexSettings.java

+20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.unit.TimeValue;
3333
import org.elasticsearch.index.mapper.AllFieldMapper;
3434
import org.elasticsearch.index.translog.Translog;
35+
import org.elasticsearch.ingest.IngestService;
3536
import org.elasticsearch.node.Node;
3637

3738
import java.util.Collections;
@@ -274,6 +275,14 @@ public final class IndexSettings {
274275
Property.Final);
275276
}
276277

278+
public static final Setting<String> DEFAULT_PIPELINE =
279+
new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
280+
if (s == null || s.isEmpty()) {
281+
throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
282+
}
283+
return s;
284+
}, Property.Dynamic, Property.IndexScope);
285+
277286
private final Index index;
278287
private final Version version;
279288
private final Logger logger;
@@ -311,6 +320,7 @@ public final class IndexSettings {
311320
private volatile int maxShingleDiff;
312321
private volatile int maxAnalyzedOffset;
313322
private volatile int maxTermsCount;
323+
private volatile String defaultPipeline;
314324

315325
/**
316326
* The maximum number of refresh listeners allows on this shard.
@@ -434,6 +444,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
434444
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
435445
+ version + "]");
436446
}
447+
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
437448

438449
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
439450
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
@@ -470,6 +481,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
470481
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
471482
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
472483
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
484+
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
473485
}
474486

475487
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
@@ -830,4 +842,12 @@ public IndexSortConfig getIndexSortConfig() {
830842
}
831843

832844
public IndexScopedSettings getScopedSettings() { return scopedSettings;}
845+
846+
public String getDefaultPipeline() {
847+
return defaultPipeline;
848+
}
849+
850+
public void setDefaultPipeline(String defaultPipeline) {
851+
this.defaultPipeline = defaultPipeline;
852+
}
833853
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
* Holder class for several ingest related services.
4040
*/
4141
public class IngestService {
42+
43+
public static final String NOOP_PIPELINE_NAME = "_none";
44+
4245
private final PipelineStore pipelineStore;
4346
private final PipelineExecutionService pipelineExecutionService;
4447

server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.update.UpdateRequest;
2525
import org.elasticsearch.cluster.ClusterChangedEvent;
2626
import org.elasticsearch.cluster.ClusterStateApplier;
27-
import org.elasticsearch.common.Strings;
2827
import org.elasticsearch.common.metrics.CounterMetric;
2928
import org.elasticsearch.common.metrics.MeanMetric;
3029
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -73,12 +72,16 @@ protected void doRun() throws Exception {
7372
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
7473
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
7574
}
76-
if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
75+
if (indexRequest == null) {
76+
continue;
77+
}
78+
String pipeline = indexRequest.getPipeline();
79+
if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
7780
try {
7881
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
7982
//this shouldn't be needed here but we do it for consistency with index api
8083
// which requires it to prevent double execution
81-
indexRequest.setPipeline(null);
84+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
8285
} catch (Exception e) {
8386
itemFailureHandler.accept(indexRequest, e);
8487
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.support.ActionFilters;
2727
import org.elasticsearch.action.update.UpdateRequest;
2828
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.metadata.MetaData;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.unit.TimeValue;
@@ -45,6 +46,7 @@
4546
import static java.util.Collections.emptySet;
4647
import static java.util.Collections.singleton;
4748
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.when;
4850

4951
public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
5052
public void testNonExceptional() {
@@ -97,7 +99,11 @@ public void testSomeFail() {
9799

98100
private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
99101
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
100-
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
102+
ClusterService clusterService = mock(ClusterService.class);
103+
ClusterState state = mock(ClusterState.class);
104+
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
105+
when(clusterService.state()).thenReturn(state);
106+
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
101107
null, null, null, mock(ActionFilters.class), null, null) {
102108
@Override
103109
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

+51-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.DocWriteRequest;
2425
import org.elasticsearch.action.index.IndexAction;
@@ -28,13 +29,16 @@
2829
import org.elasticsearch.cluster.ClusterChangedEvent;
2930
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.ClusterStateApplier;
32+
import org.elasticsearch.cluster.metadata.IndexMetaData;
33+
import org.elasticsearch.cluster.metadata.MetaData;
3134
import org.elasticsearch.cluster.node.DiscoveryNode;
3235
import org.elasticsearch.cluster.node.DiscoveryNodes;
3336
import org.elasticsearch.cluster.service.ClusterService;
3437
import org.elasticsearch.common.collect.ImmutableOpenMap;
3538
import org.elasticsearch.common.settings.Settings;
3639
import org.elasticsearch.common.util.concurrent.AtomicArray;
3740
import org.elasticsearch.index.IndexNotFoundException;
41+
import org.elasticsearch.index.IndexSettings;
3842
import org.elasticsearch.ingest.IngestService;
3943
import org.elasticsearch.ingest.PipelineExecutionService;
4044
import org.elasticsearch.tasks.Task;
@@ -68,6 +72,11 @@
6872

6973
public class TransportBulkActionIngestTests extends ESTestCase {
7074

75+
/**
76+
* Index for which mock settings contain a default pipeline.
77+
*/
78+
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
79+
7180
/** Services needed by bulk action */
7281
TransportService transportService;
7382
ClusterService clusterService;
@@ -153,6 +162,15 @@ public void setupAction() {
153162
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
154163
ClusterState state = mock(ClusterState.class);
155164
when(state.getNodes()).thenReturn(nodes);
165+
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
166+
.putAll(
167+
Collections.singletonMap(
168+
WITH_DEFAULT_PIPELINE,
169+
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
170+
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
171+
.build()
172+
).numberOfShards(1).numberOfReplicas(1).build()))
173+
.build()).build());
156174
when(clusterService.state()).thenReturn(state);
157175
doAnswer(invocation -> {
158176
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
@@ -227,7 +245,7 @@ public void testIngestLocal() throws Exception {
227245
// now check success
228246
Iterator<DocWriteRequest> req = bulkDocsItr.getValue().iterator();
229247
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
230-
indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
248+
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
231249
completionHandler.getValue().accept(null);
232250
assertTrue(action.isExecuted);
233251
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -259,7 +277,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
259277
assertTrue(failureCalled.get());
260278

261279
// now check success
262-
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
280+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
263281
completionHandler.getValue().accept(null);
264282
assertTrue(action.isExecuted);
265283
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -359,4 +377,35 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
359377
}
360378
}
361379

380+
public void testUseDefaultPipeline() throws Exception {
381+
Exception exception = new Exception("fake exception");
382+
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
383+
indexRequest.source(Collections.emptyMap());
384+
AtomicBoolean responseCalled = new AtomicBoolean(false);
385+
AtomicBoolean failureCalled = new AtomicBoolean(false);
386+
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
387+
response -> {
388+
responseCalled.set(true);
389+
},
390+
e -> {
391+
assertThat(e, sameInstance(exception));
392+
failureCalled.set(true);
393+
}));
394+
395+
// check failure works, and passes through to the listener
396+
assertFalse(action.isExecuted); // haven't executed yet
397+
assertFalse(responseCalled.get());
398+
assertFalse(failureCalled.get());
399+
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
400+
completionHandler.getValue().accept(exception);
401+
assertTrue(failureCalled.get());
402+
403+
// now check success
404+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
405+
completionHandler.getValue().accept(null);
406+
assertTrue(action.isExecuted);
407+
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
408+
verifyZeroInteractions(transportService);
409+
}
410+
362411
}

0 commit comments

Comments
 (0)