Skip to content

Commit 2c7fd82

Browse files
authored
Allow ingest processors to execute in a non blocking manner. (#46241)
This PR changes the ingest executing to be non blocking by adding an additional method to the Processor interface that accepts a BiConsumer as handler and changing IngestService#executeBulkRequest(...) to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion. This is the second PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. This change originates from the enrich branch and was introduced there in #43361.
1 parent 36502b2 commit 2c7fd82

21 files changed

+862
-403
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Set;
31+
import java.util.concurrent.CopyOnWriteArrayList;
32+
import java.util.function.BiConsumer;
3133

3234
import org.elasticsearch.ingest.WrappingProcessor;
3335
import org.elasticsearch.script.ScriptService;
@@ -65,29 +67,46 @@ boolean isIgnoreMissing() {
6567
}
6668

6769
@Override
68-
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
70+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
6971
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
7072
if (values == null) {
7173
if (ignoreMissing) {
72-
return ingestDocument;
74+
handler.accept(ingestDocument, null);
75+
} else {
76+
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
7377
}
74-
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
78+
} else {
79+
List<Object> newValues = new CopyOnWriteArrayList<>();
80+
innerExecute(0, values, newValues, ingestDocument, handler);
7581
}
76-
List<Object> newValues = new ArrayList<>(values.size());
77-
IngestDocument document = ingestDocument;
78-
for (Object value : values) {
79-
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
80-
try {
81-
document = processor.execute(document);
82-
if (document == null) {
83-
return null;
84-
}
85-
} finally {
86-
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
87-
}
82+
}
83+
84+
void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
85+
BiConsumer<IngestDocument, Exception> handler) {
86+
if (index == values.size()) {
87+
document.setFieldValue(field, new ArrayList<>(newValues));
88+
handler.accept(document, null);
89+
return;
8890
}
89-
document.setFieldValue(field, newValues);
90-
return document;
91+
92+
Object value = values.get(index);
93+
Object previousValue = document.getIngestMetadata().put("_value", value);
94+
processor.execute(document, (result, e) -> {
95+
if (e != null) {
96+
newValues.add(document.getIngestMetadata().put("_value", previousValue));
97+
handler.accept(null, e);
98+
} else if (result == null) {
99+
handler.accept(null, null);
100+
} else {
101+
newValues.add(document.getIngestMetadata().put("_value", previousValue));
102+
innerExecute(index + 1, values, newValues, document, handler);
103+
}
104+
});
105+
}
106+
107+
@Override
108+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
109+
throw new UnsupportedOperationException("this method should not get executed");
91110
}
92111

93112
@Override

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void testExecute() throws Exception {
5353
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
5454
false
5555
);
56-
processor.execute(ingestDocument);
56+
processor.execute(ingestDocument, (result, e) -> {});
5757

5858
@SuppressWarnings("unchecked")
5959
List<String> result = ingestDocument.getFieldValue("values", List.class);
@@ -73,12 +73,9 @@ public void testExecuteWithFailure() throws Exception {
7373
}
7474
});
7575
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
76-
try {
77-
processor.execute(ingestDocument);
78-
fail("exception expected");
79-
} catch (RuntimeException e) {
80-
assertThat(e.getMessage(), equalTo("failure"));
81-
}
76+
Exception[] exceptions = new Exception[1];
77+
processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
78+
assertThat(exceptions[0].getMessage(), equalTo("failure"));
8279
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
8380
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
8481

@@ -95,7 +92,7 @@ public void testExecuteWithFailure() throws Exception {
9592
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
9693
false
9794
);
98-
processor.execute(ingestDocument);
95+
processor.execute(ingestDocument, (result, e) -> {});
9996
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
10097
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c")));
10198
}
@@ -114,7 +111,7 @@ public void testMetaDataAvailable() throws Exception {
114111
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
115112
});
116113
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
117-
processor.execute(ingestDocument);
114+
processor.execute(ingestDocument, (result, e) -> {});
118115

119116
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
120117
assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index"));
@@ -142,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
142139
"_tag", "values", new SetProcessor("_tag",
143140
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
144141
(model) -> model.get("other")), false);
145-
processor.execute(ingestDocument);
142+
processor.execute(ingestDocument, (result, e) -> {});
146143

147144
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
148145
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
@@ -180,7 +177,7 @@ public String getTag() {
180177
);
181178

182179
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
183-
processor.execute(ingestDocument);
180+
processor.execute(ingestDocument, (result, e) -> {});
184181
@SuppressWarnings("unchecked")
185182
List<String> result = ingestDocument.getFieldValue("values", List.class);
186183
assertThat(result.size(), equalTo(numValues));
@@ -205,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
205202
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
206203
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
207204
), false);
208-
processor.execute(ingestDocument);
205+
processor.execute(ingestDocument, (result, e) -> {});
209206

210207
List<?> result = ingestDocument.getFieldValue("values", List.class);
211208
assertThat(result.get(0), equalTo("STRING"));
@@ -231,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws
231228
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
232229
doc.getFieldValue("_source._value", String.class)));
233230
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
234-
forEachProcessor.execute(ingestDocument);
231+
forEachProcessor.execute(ingestDocument, (result, e) -> {});
235232

236233
List<?> result = ingestDocument.getFieldValue("values", List.class);
237234
assertThat(result.get(0), equalTo("new_value"));
@@ -264,7 +261,7 @@ public void testNestedForEach() throws Exception {
264261
);
265262
ForEachProcessor processor = new ForEachProcessor(
266263
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
267-
processor.execute(ingestDocument);
264+
processor.execute(ingestDocument, (result, e) -> {});
268265

269266
List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
270267
assertThat(result.get(0), equalTo("ABC"));
@@ -282,7 +279,7 @@ public void testIgnoreMissing() throws Exception {
282279
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
283280
TestProcessor testProcessor = new TestProcessor(doc -> {});
284281
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
285-
processor.execute(ingestDocument);
282+
processor.execute(ingestDocument, (result, e) -> {});
286283
assertIngestDocument(originalIngestDocument, ingestDocument);
287284
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
288285
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2224
import org.apache.logging.log4j.message.ParameterizedMessage;
2325
import org.apache.lucene.util.SparseFixedBitSet;
2426
import org.elasticsearch.Assertions;
@@ -57,6 +59,7 @@
5759
import org.elasticsearch.common.inject.Inject;
5860
import org.elasticsearch.common.settings.Settings;
5961
import org.elasticsearch.common.unit.TimeValue;
62+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6063
import org.elasticsearch.common.util.concurrent.AtomicArray;
6164
import org.elasticsearch.index.Index;
6265
import org.elasticsearch.index.IndexNotFoundException;
@@ -82,6 +85,7 @@
8285
import java.util.Set;
8386
import java.util.concurrent.TimeUnit;
8487
import java.util.concurrent.atomic.AtomicInteger;
88+
import java.util.concurrent.atomic.AtomicIntegerArray;
8589
import java.util.function.LongSupplier;
8690
import java.util.stream.Collectors;
8791

@@ -644,14 +648,13 @@ private long relativeTime() {
644648
}
645649

646650
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
647-
long ingestStartTimeInNanos = System.nanoTime();
648-
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
649-
ingestService.executeBulkRequest(() -> bulkRequestModifier,
650-
(indexRequest, exception) -> {
651-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
652-
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
653-
bulkRequestModifier.markCurrentItemAsFailed(exception);
654-
}, (exception) -> {
651+
final long ingestStartTimeInNanos = System.nanoTime();
652+
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
653+
ingestService.executeBulkRequest(
654+
original.numberOfActions(),
655+
() -> bulkRequestModifier,
656+
bulkRequestModifier::markItemAsFailed,
657+
(originalThread, exception) -> {
655658
if (exception != null) {
656659
logger.error("failed to execute pipeline for a bulk request", exception);
657660
listener.onFailure(exception);
@@ -666,26 +669,56 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
666669
// (this will happen if pre-processing all items in the bulk failed)
667670
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
668671
} else {
669-
doExecute(task, bulkRequest, actionListener);
672+
// If a processor went async and returned a response on a different thread then
673+
// before we continue the bulk request we should fork back on a write thread:
674+
if (originalThread == Thread.currentThread()) {
675+
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
676+
doExecute(task, bulkRequest, actionListener);
677+
} else {
678+
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
679+
@Override
680+
public void onFailure(Exception e) {
681+
listener.onFailure(e);
682+
}
683+
684+
@Override
685+
protected void doRun() throws Exception {
686+
doExecute(task, bulkRequest, actionListener);
687+
}
688+
689+
@Override
690+
public boolean isForceExecution() {
691+
// If we fork back to a write thread we **not** should fail, because tp queue is full.
692+
// (Otherwise the work done during ingest will be lost)
693+
// It is okay to force execution here. Throttling of write requests happens prior to
694+
// ingest when a node receives a bulk request.
695+
return true;
696+
}
697+
});
698+
}
670699
}
671700
}
672701
},
673-
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
702+
bulkRequestModifier::markItemAsDropped
703+
);
674704
}
675705

676706
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
677707

708+
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
709+
678710
final BulkRequest bulkRequest;
679711
final SparseFixedBitSet failedSlots;
680712
final List<BulkItemResponse> itemResponses;
713+
final AtomicIntegerArray originalSlots;
681714

682-
int currentSlot = -1;
683-
int[] originalSlots;
715+
volatile int currentSlot = -1;
684716

685717
BulkRequestModifier(BulkRequest bulkRequest) {
686718
this.bulkRequest = bulkRequest;
687719
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
688720
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
721+
this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok
689722
}
690723

691724
@Override
@@ -709,12 +742,11 @@ BulkRequest getBulkRequest() {
709742

710743
int slot = 0;
711744
List<DocWriteRequest<?>> requests = bulkRequest.requests();
712-
originalSlots = new int[requests.size()]; // oversize, but that's ok
713745
for (int i = 0; i < requests.size(); i++) {
714746
DocWriteRequest<?> request = requests.get(i);
715747
if (failedSlots.get(i) == false) {
716748
modifiedBulkRequest.add(request);
717-
originalSlots[slot++] = i;
749+
originalSlots.set(slot++, i);
718750
}
719751
}
720752
return modifiedBulkRequest;
@@ -729,7 +761,7 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
729761
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
730762
BulkItemResponse[] items = response.getItems();
731763
for (int i = 0; i < items.length; i++) {
732-
itemResponses.add(originalSlots[i], response.getItems()[i]);
764+
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
733765
}
734766
delegatedListener.onResponse(
735767
new BulkResponse(
@@ -738,12 +770,12 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
738770
}
739771
}
740772

741-
void markCurrentItemAsDropped() {
742-
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
743-
failedSlots.set(currentSlot);
773+
synchronized void markItemAsDropped(int slot) {
774+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
775+
failedSlots.set(slot);
744776
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
745777
itemResponses.add(
746-
new BulkItemResponse(currentSlot, indexRequest.opType(),
778+
new BulkItemResponse(slot, indexRequest.opType(),
747779
new UpdateResponse(
748780
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
749781
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
@@ -753,16 +785,19 @@ void markCurrentItemAsDropped() {
753785
);
754786
}
755787

756-
void markCurrentItemAsFailed(Exception e) {
757-
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
788+
synchronized void markItemAsFailed(int slot, Exception e) {
789+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
790+
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
791+
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
792+
758793
// We hit a error during preprocessing a request, so we:
759794
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
760795
// 2) Add a bulk item failure for this request
761796
// 3) Continue with the next request in the bulk.
762-
failedSlots.set(currentSlot);
797+
failedSlots.set(slot);
763798
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
764799
indexRequest.id(), e);
765-
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
800+
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
766801
}
767802

768803
}

0 commit comments

Comments
 (0)