Skip to content

Commit e21be6e

Browse files
authored
Preventing serialization errors in the nodes stats API (#90319)
Preventing serialization errors in the nodes stats API, and adding logging to the ingest counter code so that we can find the root cause of the problem in the future.
1 parent 55c489d commit e21be6e

File tree

7 files changed

+168
-73
lines changed

7 files changed

+168
-73
lines changed

docs/changelog/90319.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 90319
2+
summary: Preventing serialization errors in the nodes stats API
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 77973

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchException;
1214
import org.elasticsearch.core.Tuple;
1315

@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124
import java.util.stream.Collectors;
@@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor {
3033
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
3134
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
3235

36+
private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);
37+
3338
private final boolean ignoreFailure;
3439
private final List<Processor> processors;
3540
private final List<Processor> onFailureProcessors;
@@ -191,25 +196,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
191196
final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2();
192197
final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1();
193198
final IngestDocument finalIngestDocument = ingestDocument;
199+
/*
200+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
201+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
202+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
203+
* is only executed once.
204+
*/
205+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
194206
finalMetric.preIngest();
207+
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
195208
try {
196209
finalProcessor.execute(ingestDocument, (result, e) -> {
197-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
198-
finalMetric.postIngest(ingestTimeInNanos);
199-
200-
if (e != null) {
201-
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
210+
if (listenerHasBeenCalled.getAndSet(true)) {
211+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
212+
assert false : "A listener was unexpectedly called more than once";
202213
} else {
203-
if (result != null) {
204-
innerExecute(nextProcessor, result, handler);
214+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
215+
finalMetric.postIngest(ingestTimeInNanos);
216+
postIngestHasBeenCalled.set(true);
217+
if (e != null) {
218+
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
205219
} else {
206-
handler.accept(null, null);
220+
if (result != null) {
221+
innerExecute(nextProcessor, result, handler);
222+
} else {
223+
handler.accept(null, null);
224+
}
207225
}
208226
}
209227
});
210228
} catch (Exception e) {
211229
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
212-
finalMetric.postIngest(ingestTimeInNanos);
230+
if (postIngestHasBeenCalled.get()) {
231+
logger.warn("Preventing postIngest from being called more than once", new RuntimeException());
232+
assert false : "Attempt to call postIngest more than once";
233+
} else {
234+
finalMetric.postIngest(ingestTimeInNanos);
235+
}
213236
executeOnFailureOuter(currentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
214237
}
215238
}

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.logging.DeprecationCategory;
1214
import org.elasticsearch.common.logging.DeprecationLogger;
1315
import org.elasticsearch.script.DynamicMap;
@@ -26,6 +28,7 @@
2628
import java.util.ListIterator;
2729
import java.util.Map;
2830
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.function.BiConsumer;
3033
import java.util.function.Function;
3134
import java.util.function.LongSupplier;
@@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
4548
return value;
4649
});
4750

51+
private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);
52+
4853
static final String TYPE = "conditional";
4954

5055
private final Script condition;
@@ -120,15 +125,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
120125

121126
if (matches) {
122127
final long startTimeInNanos = relativeTimeProvider.getAsLong();
128+
/*
129+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
130+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
131+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
132+
* is only executed once.
133+
*/
134+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
123135
metric.preIngest();
124136
processor.execute(ingestDocument, (result, e) -> {
125-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
126-
metric.postIngest(ingestTimeInNanos);
127-
if (e != null) {
128-
metric.ingestFailed();
129-
handler.accept(null, e);
137+
if (listenerHasBeenCalled.getAndSet(true)) {
138+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
139+
assert false : "A listener was unexpectedly called more than once";
130140
} else {
131-
handler.accept(result, null);
141+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
142+
metric.postIngest(ingestTimeInNanos);
143+
if (e != null) {
144+
metric.ingestFailed();
145+
handler.accept(null, e);
146+
} else {
147+
handler.accept(result, null);
148+
}
132149
}
133150
});
134151
} else {

server/src/main/java/org/elasticsearch/ingest/IngestMetric.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.metrics.CounterMetric;
1214

1315
import java.util.concurrent.TimeUnit;
@@ -22,6 +24,8 @@
2224
*/
2325
class IngestMetric {
2426

27+
private static final Logger logger = LogManager.getLogger(IngestMetric.class);
28+
2529
/**
2630
* The time it takes to complete the measured item.
2731
*/
@@ -53,7 +57,19 @@ void preIngest() {
5357
*/
5458
void postIngest(long ingestTimeInNanos) {
5559
long current = ingestCurrent.decrementAndGet();
56-
assert current >= 0 : "ingest metric current count double-decremented";
60+
if (current < 0) {
61+
/*
62+
* This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a
63+
* serialization error that prevents the nodes stats API from working. So we're doing 3 things here:
64+
* (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the
65+
* bug if it still exists
66+
* (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug
67+
* (3) Increment the counter back up so that we don't hit serialization failures
68+
*/
69+
logger.warn("Current ingest counter decremented below 0", new RuntimeException());
70+
assert false : "ingest metric current count double-decremented";
71+
ingestCurrent.incrementAndGet();
72+
}
5773
this.ingestTimeInNanos.inc(ingestTimeInNanos);
5874
ingestCount.inc();
5975
}
@@ -84,6 +100,8 @@ void add(IngestMetric metrics) {
84100
IngestStats.Stats createStats() {
85101
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
86102
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
87-
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
103+
// It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319
104+
long currentCount = Math.max(0, ingestCurrent.get());
105+
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count());
88106
}
89107
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Objects;
7373
import java.util.Set;
7474
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.atomic.AtomicBoolean;
7576
import java.util.concurrent.atomic.AtomicInteger;
7677
import java.util.function.BiConsumer;
7778
import java.util.function.Consumer;
@@ -911,60 +912,72 @@ private void innerExecute(
911912
VersionType versionType = indexRequest.versionType();
912913
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
913914
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
915+
/*
916+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
917+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
918+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
919+
* is only executed once.
920+
*/
921+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
914922
ingestDocument.executePipeline(pipeline, (result, e) -> {
915-
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
916-
totalMetrics.postIngest(ingestTimeInNanos);
917-
if (e != null) {
918-
totalMetrics.ingestFailed();
919-
handler.accept(e);
920-
} else if (result == null) {
921-
itemDroppedHandler.accept(slot);
922-
handler.accept(null);
923+
if (listenerHasBeenCalled.getAndSet(true)) {
924+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
925+
assert false : "A listener was unexpectedly called more than once";
923926
} else {
924-
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
925-
926-
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
927-
// before ingestion, which might also get modified during ingestion.
928-
indexRequest.index(metadata.getIndex());
929-
indexRequest.id(metadata.getId());
930-
indexRequest.routing(metadata.getRouting());
931-
indexRequest.version(metadata.getVersion());
932-
if (metadata.getVersionType() != null) {
933-
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
934-
}
935-
Number number;
936-
if ((number = metadata.getIfSeqNo()) != null) {
937-
indexRequest.setIfSeqNo(number.longValue());
938-
}
939-
if ((number = metadata.getIfPrimaryTerm()) != null) {
940-
indexRequest.setIfPrimaryTerm(number.longValue());
941-
}
942-
try {
943-
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
944-
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
945-
} catch (IllegalArgumentException ex) {
946-
// An IllegalArgumentException can be thrown when an ingest
947-
// processor creates a source map that is self-referencing.
948-
// In that case, we catch and wrap the exception so we can
949-
// include which pipeline failed.
927+
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
928+
totalMetrics.postIngest(ingestTimeInNanos);
929+
if (e != null) {
950930
totalMetrics.ingestFailed();
951-
handler.accept(
952-
new IllegalArgumentException(
953-
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
954-
ex
955-
)
956-
);
957-
return;
958-
}
959-
Map<String, String> map;
960-
if ((map = metadata.getDynamicTemplates()) != null) {
961-
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
962-
mergedDynamicTemplates.putAll(map);
963-
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
964-
}
965-
postIngest(ingestDocument, indexRequest);
931+
handler.accept(e);
932+
} else if (result == null) {
933+
itemDroppedHandler.accept(slot);
934+
handler.accept(null);
935+
} else {
936+
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
937+
938+
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
939+
// before ingestion, which might also get modified during ingestion.
940+
indexRequest.index(metadata.getIndex());
941+
indexRequest.id(metadata.getId());
942+
indexRequest.routing(metadata.getRouting());
943+
indexRequest.version(metadata.getVersion());
944+
if (metadata.getVersionType() != null) {
945+
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
946+
}
947+
Number number;
948+
if ((number = metadata.getIfSeqNo()) != null) {
949+
indexRequest.setIfSeqNo(number.longValue());
950+
}
951+
if ((number = metadata.getIfPrimaryTerm()) != null) {
952+
indexRequest.setIfPrimaryTerm(number.longValue());
953+
}
954+
try {
955+
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
956+
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
957+
} catch (IllegalArgumentException ex) {
958+
// An IllegalArgumentException can be thrown when an ingest
959+
// processor creates a source map that is self-referencing.
960+
// In that case, we catch and wrap the exception so we can
961+
// include which pipeline failed.
962+
totalMetrics.ingestFailed();
963+
handler.accept(
964+
new IllegalArgumentException(
965+
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
966+
ex
967+
)
968+
);
969+
return;
970+
}
971+
Map<String, String> map;
972+
if ((map = metadata.getDynamicTemplates()) != null) {
973+
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
974+
mergedDynamicTemplates.putAll(map);
975+
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
976+
}
977+
postIngest(ingestDocument, indexRequest);
966978

967-
handler.accept(null);
979+
handler.accept(null);
980+
}
968981
}
969982
});
970983
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchParseException;
1214
import org.elasticsearch.core.Nullable;
1315
import org.elasticsearch.script.ScriptService;
@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124

@@ -30,6 +33,8 @@ public final class Pipeline {
3033
public static final String ON_FAILURE_KEY = "on_failure";
3134
public static final String META_KEY = "_meta";
3235

36+
private static final Logger logger = LogManager.getLogger(Pipeline.class);
37+
3338
private final String id;
3439
@Nullable
3540
private final String description;
@@ -113,14 +118,26 @@ public static Pipeline create(
113118
*/
114119
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
115120
final long startTimeInNanos = relativeTimeProvider.getAsLong();
121+
/*
122+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
123+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
124+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
125+
* is only executed once.
126+
*/
127+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
116128
metrics.preIngest();
117129
compoundProcessor.execute(ingestDocument, (result, e) -> {
118-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
119-
metrics.postIngest(ingestTimeInNanos);
120-
if (e != null) {
121-
metrics.ingestFailed();
130+
if (listenerHasBeenCalled.getAndSet(true)) {
131+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
132+
assert false : "A listener was unexpectedly called more than once";
133+
} else {
134+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
135+
metrics.postIngest(ingestTimeInNanos);
136+
if (e != null) {
137+
metrics.ingestFailed();
138+
}
139+
handler.accept(result, e);
122140
}
123-
handler.accept(result, e);
124141
});
125142
}
126143

server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() {
4242

4343
// the second postIngest triggers an assertion error
4444
expectThrows(AssertionError.class, () -> metric.postIngest(500000L));
45-
assertThat(-1L, equalTo(metric.createStats().getIngestCurrent()));
45+
// We never allow the reported ingestCurrent to be negative:
46+
assertThat(metric.createStats().getIngestCurrent(), equalTo(0L));
4647
}
4748

4849
}

0 commit comments

Comments
 (0)