Skip to content

Commit 2561b94

Browse files
Async Processors and Final Pipelines are Broken (#69818)
There was an obvious race here where async processor and final pipeline will run concurrently (or the final pipeline runs multiple times in from the while loop). relates #52339 (fixes one failure scenario here but since the failure also occurred in 7.10.x not all of them)
1 parent c0fc847 commit 2561b94

File tree

2 files changed

+57
-54
lines changed

2 files changed

+57
-54
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Collection;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.function.BiConsumer;
5051
import java.util.function.Supplier;
5152

5253
import static org.hamcrest.Matchers.containsString;
@@ -357,9 +358,17 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
357358
new AbstractProcessor(tag, description) {
358359

359360
@Override
360-
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
361-
ingestDocument.setFieldValue("default", true);
362-
return ingestDocument;
361+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
362+
// randomize over sync and async execution
363+
randomFrom(parameters.genericExecutor, Runnable::run).accept(() -> {
364+
ingestDocument.setFieldValue("default", true);
365+
handler.accept(ingestDocument, null);
366+
});
367+
}
368+
369+
@Override
370+
public IngestDocument execute(IngestDocument ingestDocument) {
371+
throw new AssertionError("should not be called");
363372
}
364373

365374
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -496,67 +496,61 @@ private void executePipelines(
496496
final BiConsumer<Thread, Exception> onCompletion,
497497
final Thread originalThread
498498
) {
499-
while (it.hasNext()) {
500-
final String pipelineId = it.next();
501-
try {
502-
PipelineHolder holder = pipelines.get(pipelineId);
503-
if (holder == null) {
504-
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
499+
assert it.hasNext();
500+
final String pipelineId = it.next();
501+
try {
502+
PipelineHolder holder = pipelines.get(pipelineId);
503+
if (holder == null) {
504+
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
505+
}
506+
Pipeline pipeline = holder.pipeline;
507+
String originalIndex = indexRequest.indices()[0];
508+
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
509+
if (e != null) {
510+
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
511+
pipelineId, indexRequest.index(), indexRequest.id()), e);
512+
onFailure.accept(slot, e);
505513
}
506-
Pipeline pipeline = holder.pipeline;
507-
String originalIndex = indexRequest.indices()[0];
508-
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
509-
if (e != null) {
510-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
511-
pipelineId, indexRequest.index(), indexRequest.id()), e);
512-
onFailure.accept(slot, e);
513-
}
514514

515-
Iterator<String> newIt = it;
516-
boolean newHasFinalPipeline = hasFinalPipeline;
517-
String newIndex = indexRequest.indices()[0];
515+
Iterator<String> newIt = it;
516+
boolean newHasFinalPipeline = hasFinalPipeline;
517+
String newIndex = indexRequest.indices()[0];
518518

519-
if (Objects.equals(originalIndex, newIndex) == false) {
520-
if (hasFinalPipeline && it.hasNext() == false) {
521-
totalMetrics.ingestFailed();
522-
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
523-
"] can't change the target index"));
519+
if (Objects.equals(originalIndex, newIndex) == false) {
520+
if (hasFinalPipeline && it.hasNext() == false) {
521+
totalMetrics.ingestFailed();
522+
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
523+
"] can't change the target index"));
524+
} else {
525+
indexRequest.isPipelineResolved(false);
526+
resolvePipelines(null, indexRequest, state.metadata());
527+
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
528+
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
529+
newHasFinalPipeline = true;
524530
} else {
525-
526-
//Drain old it so it's not looped over
527-
it.forEachRemaining($ -> {
528-
});
529-
indexRequest.isPipelineResolved(false);
530-
resolvePipelines(null, indexRequest, state.metadata());
531-
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
532-
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
533-
newHasFinalPipeline = true;
534-
} else {
535-
newIt = Collections.emptyIterator();
536-
}
531+
newIt = Collections.emptyIterator();
537532
}
538533
}
534+
}
539535

540-
if (newIt.hasNext()) {
541-
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
542-
originalThread);
543-
} else {
544-
if (counter.decrementAndGet() == 0) {
545-
onCompletion.accept(originalThread, null);
546-
}
547-
assert counter.get() >= 0;
536+
if (newIt.hasNext()) {
537+
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
538+
originalThread);
539+
} else {
540+
if (counter.decrementAndGet() == 0) {
541+
onCompletion.accept(originalThread, null);
548542
}
549-
});
550-
} catch (Exception e) {
551-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
552-
pipelineId, indexRequest.index(), indexRequest.id()), e);
553-
onFailure.accept(slot, e);
554-
if (counter.decrementAndGet() == 0) {
555-
onCompletion.accept(originalThread, null);
543+
assert counter.get() >= 0;
556544
}
557-
assert counter.get() >= 0;
558-
break;
545+
});
546+
} catch (Exception e) {
547+
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
548+
pipelineId, indexRequest.index(), indexRequest.id()), e);
549+
onFailure.accept(slot, e);
550+
if (counter.decrementAndGet() == 0) {
551+
onCompletion.accept(originalThread, null);
559552
}
553+
assert counter.get() >= 0;
560554
}
561555
}
562556

0 commit comments

Comments
 (0)