@@ -947,10 +947,8 @@ private void innerExecute(
947
947
boolean ensureNoSelfReferences = ingestDocument .doNoSelfReferencesCheck ();
948
948
indexRequest .source (ingestDocument .getSource (), indexRequest .getContentType (), ensureNoSelfReferences );
949
949
} catch (IllegalArgumentException ex ) {
950
- // An IllegalArgumentException can be thrown when an ingest
951
- // processor creates a source map that is self-referencing.
952
- // In that case, we catch and wrap the exception so we can
953
- // include which pipeline failed.
950
+ // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
951
+ // In that case, we catch and wrap the exception, so we can include which pipeline failed.
954
952
totalMetrics .ingestFailed ();
955
953
handler .accept (
956
954
new IllegalArgumentException (
@@ -959,6 +957,19 @@ private void innerExecute(
959
957
)
960
958
);
961
959
return ;
960
+ } catch (Exception ex ) {
961
+ // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
962
+ // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
963
+ // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
964
+ // no self references.
965
+ totalMetrics .ingestFailed ();
966
+ handler .accept (
967
+ new RuntimeException (
968
+ "Failed to generate the source document for ingest pipeline [" + pipeline .getId () + "]" ,
969
+ ex
970
+ )
971
+ );
972
+ return ;
962
973
}
963
974
Map <String , String > map ;
964
975
if ((map = metadata .getDynamicTemplates ()) != null ) {
0 commit comments