@@ -497,67 +497,61 @@ private void executePipelines(
497
497
final BiConsumer <Thread , Exception > onCompletion ,
498
498
final Thread originalThread
499
499
) {
500
- while (it .hasNext ()) {
501
- final String pipelineId = it .next ();
502
- try {
503
- PipelineHolder holder = pipelines .get (pipelineId );
504
- if (holder == null ) {
505
- throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
500
+ assert it .hasNext ();
501
+ final String pipelineId = it .next ();
502
+ try {
503
+ PipelineHolder holder = pipelines .get (pipelineId );
504
+ if (holder == null ) {
505
+ throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
506
+ }
507
+ Pipeline pipeline = holder .pipeline ;
508
+ String originalIndex = indexRequest .indices ()[0 ];
509
+ innerExecute (slot , indexRequest , pipeline , onDropped , e -> {
510
+ if (e != null ) {
511
+ logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
512
+ pipelineId , indexRequest .index (), indexRequest .id ()), e );
513
+ onFailure .accept (slot , e );
506
514
}
507
- Pipeline pipeline = holder .pipeline ;
508
- String originalIndex = indexRequest .indices ()[0 ];
509
- innerExecute (slot , indexRequest , pipeline , onDropped , e -> {
510
- if (e != null ) {
511
- logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
512
- pipelineId , indexRequest .index (), indexRequest .id ()), e );
513
- onFailure .accept (slot , e );
514
- }
515
515
516
- Iterator <String > newIt = it ;
517
- boolean newHasFinalPipeline = hasFinalPipeline ;
518
- String newIndex = indexRequest .indices ()[0 ];
516
+ Iterator <String > newIt = it ;
517
+ boolean newHasFinalPipeline = hasFinalPipeline ;
518
+ String newIndex = indexRequest .indices ()[0 ];
519
519
520
- if (Objects .equals (originalIndex , newIndex ) == false ) {
521
- if (hasFinalPipeline && it .hasNext () == false ) {
522
- totalMetrics .ingestFailed ();
523
- onFailure .accept (slot , new IllegalStateException ("final pipeline [" + pipelineId +
524
- "] can't change the target index" ));
520
+ if (Objects .equals (originalIndex , newIndex ) == false ) {
521
+ if (hasFinalPipeline && it .hasNext () == false ) {
522
+ totalMetrics .ingestFailed ();
523
+ onFailure .accept (slot , new IllegalStateException ("final pipeline [" + pipelineId +
524
+ "] can't change the target index" ));
525
+ } else {
526
+ indexRequest .isPipelineResolved (false );
527
+ resolvePipelines (null , indexRequest , state .metadata ());
528
+ if (IngestService .NOOP_PIPELINE_NAME .equals (indexRequest .getFinalPipeline ()) == false ) {
529
+ newIt = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
530
+ newHasFinalPipeline = true ;
525
531
} else {
526
-
527
- //Drain old it so it's not looped over
528
- it .forEachRemaining ($ -> {
529
- });
530
- indexRequest .isPipelineResolved (false );
531
- resolvePipelines (null , indexRequest , state .metadata ());
532
- if (IngestService .NOOP_PIPELINE_NAME .equals (indexRequest .getFinalPipeline ()) == false ) {
533
- newIt = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
534
- newHasFinalPipeline = true ;
535
- } else {
536
- newIt = Collections .emptyIterator ();
537
- }
532
+ newIt = Collections .emptyIterator ();
538
533
}
539
534
}
535
+ }
540
536
541
- if (newIt .hasNext ()) {
542
- executePipelines (slot , newIt , newHasFinalPipeline , indexRequest , onDropped , onFailure , counter , onCompletion ,
543
- originalThread );
544
- } else {
545
- if (counter .decrementAndGet () == 0 ) {
546
- onCompletion .accept (originalThread , null );
547
- }
548
- assert counter .get () >= 0 ;
537
+ if (newIt .hasNext ()) {
538
+ executePipelines (slot , newIt , newHasFinalPipeline , indexRequest , onDropped , onFailure , counter , onCompletion ,
539
+ originalThread );
540
+ } else {
541
+ if (counter .decrementAndGet () == 0 ) {
542
+ onCompletion .accept (originalThread , null );
549
543
}
550
- });
551
- } catch (Exception e ) {
552
- logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
553
- pipelineId , indexRequest .index (), indexRequest .id ()), e );
554
- onFailure .accept (slot , e );
555
- if (counter .decrementAndGet () == 0 ) {
556
- onCompletion .accept (originalThread , null );
544
+ assert counter .get () >= 0 ;
557
545
}
558
- assert counter .get () >= 0 ;
559
- break ;
546
+ });
547
+ } catch (Exception e ) {
548
+ logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
549
+ pipelineId , indexRequest .index (), indexRequest .id ()), e );
550
+ onFailure .accept (slot , e );
551
+ if (counter .decrementAndGet () == 0 ) {
552
+ onCompletion .accept (originalThread , null );
560
553
}
554
+ assert counter .get () >= 0 ;
561
555
}
562
556
}
563
557
0 commit comments