Skip to content

Commit 2079f1c

Browse files
authored
Backport: Fix ingest simulate response document order if processor executes async (#50269)
Backport #50244 to 7.x branch. If a processor executes asynchronously and the ingest simulate api simulates with multiple documents then the order of the documents in the response may not match the order of the documents in the request. Alexander Reelsen discovered this issue with the enrich processor with the following reproduction: ``` PUT cities/_doc/munich {"zip":"80331","city":"Munich"} PUT cities/_doc/berlin {"zip":"10965","city":"Berlin"} PUT /_enrich/policy/zip-policy { "match": { "indices": "cities", "match_field": "zip", "enrich_fields": [ "city" ] } } POST /_enrich/policy/zip-policy/_execute GET _cat/indices/.enrich-* POST /_ingest/pipeline/_simulate { "pipeline": { "processors" : [ { "enrich" : { "policy_name": "zip-policy", "field" : "zip", "target_field": "city", "max_matches": "1" } } ] }, "docs": [ { "_id": "first", "_source" : { "zip" : "80331" } } , { "_id": "second", "_source" : { "zip" : "50667" } } ] } ``` * fixed test compile error
1 parent 4f24739 commit 2079f1c

File tree

3 files changed

+72
-4
lines changed

3 files changed

+72
-4
lines changed

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,21 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
6767
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
6868
threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
6969
final AtomicInteger counter = new AtomicInteger();
70-
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>();
70+
final List<SimulateDocumentResult> responses =
71+
new CopyOnWriteArrayList<>(new SimulateDocumentBaseResult[request.getDocuments().size()]);
72+
int iter = 0;
7173
for (IngestDocument ingestDocument : request.getDocuments()) {
74+
final int index = iter;
7275
executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> {
7376
if (response != null) {
74-
responses.add(response);
77+
responses.set(index, response);
7578
}
7679
if (counter.incrementAndGet() == request.getDocuments().size()) {
7780
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(),
7881
request.isVerbose(), responses));
7982
}
8083
});
84+
iter++;
8185
}
8286
}));
8387
}

server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

+62-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.elasticsearch.action.ingest;
2121

22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.index.VersionType;
24+
import org.elasticsearch.ingest.AbstractProcessor;
2225
import org.elasticsearch.ingest.CompoundProcessor;
2326
import org.elasticsearch.ingest.DropProcessor;
2427
import org.elasticsearch.ingest.IngestDocument;
@@ -29,18 +32,24 @@
2932
import org.elasticsearch.ingest.TestProcessor;
3033
import org.elasticsearch.test.ESTestCase;
3134
import org.elasticsearch.threadpool.TestThreadPool;
32-
import org.elasticsearch.threadpool.TestThreadPool;
35+
import org.elasticsearch.threadpool.ThreadPool;
3336
import org.junit.After;
3437
import org.junit.Before;
3538

39+
import java.util.ArrayList;
3640
import java.util.Collections;
41+
import java.util.HashMap;
42+
import java.util.List;
3743
import java.util.Map;
3844
import java.util.concurrent.CountDownLatch;
45+
import java.util.concurrent.TimeUnit;
3946
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.function.BiConsumer;
4048

4149
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
4250
import static org.hamcrest.Matchers.equalTo;
4351
import static org.hamcrest.Matchers.instanceOf;
52+
import static org.hamcrest.Matchers.is;
4453
import static org.hamcrest.Matchers.not;
4554
import static org.hamcrest.Matchers.notNullValue;
4655
import static org.hamcrest.Matchers.nullValue;
@@ -331,4 +340,56 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception {
331340
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
332341
}
333342

343+
public void testAsyncSimulation() throws Exception {
344+
int numDocs = randomIntBetween(1, 64);
345+
List<IngestDocument> documents = new ArrayList<>(numDocs);
346+
for (int id = 0; id < numDocs; id++) {
347+
documents.add(new IngestDocument("_index", "_type", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>()));
348+
}
349+
Processor processor1 = new AbstractProcessor(null) {
350+
351+
@Override
352+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
353+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
354+
ingestDocument.setFieldValue("processed", true);
355+
handler.accept(ingestDocument, null);
356+
});
357+
}
358+
359+
@Override
360+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
361+
throw new UnsupportedOperationException();
362+
}
363+
364+
@Override
365+
public String getType() {
366+
return "none-of-your-business";
367+
}
368+
};
369+
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
370+
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);
371+
372+
AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();
373+
AtomicReference<Exception> errorHolder = new AtomicReference<>();
374+
CountDownLatch latch = new CountDownLatch(1);
375+
executionService.execute(request, ActionListener.wrap(response -> {
376+
responseHolder.set(response);
377+
latch.countDown();
378+
}, e -> {
379+
errorHolder.set(e);
380+
latch.countDown();
381+
}));
382+
latch.await(1, TimeUnit.MINUTES);
383+
assertThat(errorHolder.get(), nullValue());
384+
SimulatePipelineResponse response = responseHolder.get();
385+
assertThat(response, notNullValue());
386+
assertThat(response.getResults().size(), equalTo(numDocs));
387+
388+
for (int id = 0; id < numDocs; id++) {
389+
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(id);
390+
assertThat(result.getIngestDocument().getMetadata().get(IngestDocument.MetaData.ID), equalTo(Integer.toString(id)));
391+
assertThat(result.getIngestDocument().getSourceAndMetadata().get("processed"), is(true));
392+
}
393+
}
394+
334395
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
6363

6464
@Override
6565
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
66-
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
66+
// Write tp is expected when executing enrich processor from index / bulk api
67+
// Management tp is expected when executing enrich processor from ingest simulate api
68+
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
69+
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
6770
coordinator.schedule(request, listener);
6871
}
6972
}

0 commit comments

Comments
 (0)