Skip to content

Commit 753526e

Browse files
Async Processors and Final Pipelines are Broken (#69818) (#69848)
There was an obvious race here where async processor and final pipeline will run concurrently (or the final pipeline runs multiple times in from the while loop). relates #52339 (fixes one failure scenario here but since the failure also occurred in 7.10.x not all of them)
1 parent af97090 commit 753526e

File tree

2 files changed

+57
-54
lines changed

2 files changed

+57
-54
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Collections;
4949
import java.util.HashMap;
5050
import java.util.Map;
51+
import java.util.function.BiConsumer;
5152
import java.util.function.Supplier;
5253

5354
import static org.hamcrest.Matchers.containsString;
@@ -359,9 +360,17 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
359360
new AbstractProcessor(tag, description) {
360361

361362
@Override
362-
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
363-
ingestDocument.setFieldValue("default", true);
364-
return ingestDocument;
363+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
364+
// randomize over sync and async execution
365+
randomFrom(parameters.genericExecutor, Runnable::run).accept(() -> {
366+
ingestDocument.setFieldValue("default", true);
367+
handler.accept(ingestDocument, null);
368+
});
369+
}
370+
371+
@Override
372+
public IngestDocument execute(IngestDocument ingestDocument) {
373+
throw new AssertionError("should not be called");
365374
}
366375

367376
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

+45-51
Original file line numberDiff line numberDiff line change
@@ -497,67 +497,61 @@ private void executePipelines(
497497
final BiConsumer<Thread, Exception> onCompletion,
498498
final Thread originalThread
499499
) {
500-
while (it.hasNext()) {
501-
final String pipelineId = it.next();
502-
try {
503-
PipelineHolder holder = pipelines.get(pipelineId);
504-
if (holder == null) {
505-
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
500+
assert it.hasNext();
501+
final String pipelineId = it.next();
502+
try {
503+
PipelineHolder holder = pipelines.get(pipelineId);
504+
if (holder == null) {
505+
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
506+
}
507+
Pipeline pipeline = holder.pipeline;
508+
String originalIndex = indexRequest.indices()[0];
509+
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
510+
if (e != null) {
511+
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
512+
pipelineId, indexRequest.index(), indexRequest.id()), e);
513+
onFailure.accept(slot, e);
506514
}
507-
Pipeline pipeline = holder.pipeline;
508-
String originalIndex = indexRequest.indices()[0];
509-
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
510-
if (e != null) {
511-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
512-
pipelineId, indexRequest.index(), indexRequest.id()), e);
513-
onFailure.accept(slot, e);
514-
}
515515

516-
Iterator<String> newIt = it;
517-
boolean newHasFinalPipeline = hasFinalPipeline;
518-
String newIndex = indexRequest.indices()[0];
516+
Iterator<String> newIt = it;
517+
boolean newHasFinalPipeline = hasFinalPipeline;
518+
String newIndex = indexRequest.indices()[0];
519519

520-
if (Objects.equals(originalIndex, newIndex) == false) {
521-
if (hasFinalPipeline && it.hasNext() == false) {
522-
totalMetrics.ingestFailed();
523-
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
524-
"] can't change the target index"));
520+
if (Objects.equals(originalIndex, newIndex) == false) {
521+
if (hasFinalPipeline && it.hasNext() == false) {
522+
totalMetrics.ingestFailed();
523+
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
524+
"] can't change the target index"));
525+
} else {
526+
indexRequest.isPipelineResolved(false);
527+
resolvePipelines(null, indexRequest, state.metadata());
528+
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
529+
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
530+
newHasFinalPipeline = true;
525531
} else {
526-
527-
//Drain old it so it's not looped over
528-
it.forEachRemaining($ -> {
529-
});
530-
indexRequest.isPipelineResolved(false);
531-
resolvePipelines(null, indexRequest, state.metadata());
532-
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
533-
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
534-
newHasFinalPipeline = true;
535-
} else {
536-
newIt = Collections.emptyIterator();
537-
}
532+
newIt = Collections.emptyIterator();
538533
}
539534
}
535+
}
540536

541-
if (newIt.hasNext()) {
542-
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
543-
originalThread);
544-
} else {
545-
if (counter.decrementAndGet() == 0) {
546-
onCompletion.accept(originalThread, null);
547-
}
548-
assert counter.get() >= 0;
537+
if (newIt.hasNext()) {
538+
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
539+
originalThread);
540+
} else {
541+
if (counter.decrementAndGet() == 0) {
542+
onCompletion.accept(originalThread, null);
549543
}
550-
});
551-
} catch (Exception e) {
552-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
553-
pipelineId, indexRequest.index(), indexRequest.id()), e);
554-
onFailure.accept(slot, e);
555-
if (counter.decrementAndGet() == 0) {
556-
onCompletion.accept(originalThread, null);
544+
assert counter.get() >= 0;
557545
}
558-
assert counter.get() >= 0;
559-
break;
546+
});
547+
} catch (Exception e) {
548+
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
549+
pipelineId, indexRequest.index(), indexRequest.id()), e);
550+
onFailure.accept(slot, e);
551+
if (counter.decrementAndGet() == 0) {
552+
onCompletion.accept(originalThread, null);
560553
}
554+
assert counter.get() >= 0;
561555
}
562556
}
563557

0 commit comments

Comments
 (0)