diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index 8baccef4c1..abc12407c5 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -235,7 +235,7 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil if (!coreConfiguration.isActive()) { transaction = noopTransaction(); } else { - transaction = transactionPool.createInstance().start(childContextCreator, parent, epochMicros, sampler); + transaction = createTransaction().start(childContextCreator, parent, epochMicros, sampler); } if (logger.isDebugEnabled()) { logger.debug("startTransaction {} {", transaction); @@ -252,7 +252,16 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil } public Transaction noopTransaction() { - return transactionPool.createInstance().startNoop(); + return createTransaction().startNoop(); + } + + private Transaction createTransaction() { + Transaction transaction = transactionPool.createInstance(); + while (transaction.getReferenceCount() != 0) { + logger.warn("Tried to start a transaction with a non-zero reference count {} {}", transaction.getReferenceCount(), transaction); + transaction = transactionPool.createInstance(); + } + return transaction; } @Nullable @@ -295,7 +304,7 @@ public Span startSpan(AbstractSpan parent, long epochMicros) { * @see #startSpan(TraceContext.ChildContextCreator, Object) */ public Span startSpan(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros) { - Span span = spanPool.createInstance(); + Span span = createSpan(); final boolean dropped; Transaction transaction = currentTransaction(); if (transaction != null) { @@ -313,6 +322,15 @@ public Span startSpan(TraceContext.ChildContextCreator childContextCreato return span; } + private Span createSpan() { + Span span = spanPool.createInstance(); + while (span.getReferenceCount() != 0) { + logger.warn("Tried to start a span with a non-zero reference count {} {}", span.getReferenceCount(), span); + span = spanPool.createInstance(); + } + return span; + } + private boolean isTransactionSpanLimitReached(Transaction transaction) { return coreConfiguration.getTransactionMaxSpans() <= transaction.getSpanCount().getStarted().get(); } @@ -372,7 +390,7 @@ public void endTransaction(Transaction transaction) { // we do report non-sampled transactions (without the context) reporter.report(transaction); } else { - transaction.recycle(); + transaction.decrementReferences(); } } @@ -387,7 +405,7 @@ public void endSpan(Span span) { } reporter.report(span); } else { - span.recycle(); + span.decrementReferences(); } } @@ -489,14 +507,14 @@ public List getActivationListeners() { public void activate(TraceContextHolder holder) { if (logger.isDebugEnabled()) { - logger.debug("Activating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId()); + logger.debug("Activating {} on thread {}", holder, Thread.currentThread().getId()); } activeStack.get().push(holder); } public void deactivate(TraceContextHolder holder) { if (logger.isDebugEnabled()) { - logger.debug("Deactivating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId()); + logger.debug("Deactivating {} on thread {}", holder, Thread.currentThread().getId()); } final Deque> stack = activeStack.get(); assertIsActive(holder, stack.poll()); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java index d44e1eb9f6..144e4fddd6 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java @@ -59,8 +59,11 @@ protected boolean beforeDelegation(final AbstractSpan localSpan) { protected void afterDelegation(final AbstractSpan localSpan, boolean activated) { try { - if (localSpan != null && activated) { - localSpan.deactivate(); + if (localSpan != null) { + if (activated) { + localSpan.deactivate(); + } + localSpan.decrementReferences(); } doRecycle(); } catch (Throwable t) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java index 479b06f9c4..3a62b98099 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java @@ -48,6 +48,7 @@ public SpanInScopeCallableWrapper(ElasticApmTracer tracer) { public SpanInScopeCallableWrapper wrap(Callable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java index 88c56af95c..7f69da5a4f 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java @@ -47,6 +47,7 @@ public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) { public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java index 86a8a057dd..7934d8a6e9 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java @@ -68,13 +68,6 @@ public int getPayloadSize() { return errors.size(); } - @Override - public void recycle() { - for (ErrorCapture error : errors) { - error.recycle(); - } - } - @Override public void resetState() { errors.clear(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java index 87c78b5d95..ed6d165948 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java @@ -81,5 +81,4 @@ public SystemInfo getSystem() { public abstract int getPayloadSize(); - public abstract void recycle(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java index 16e78449c0..a318854583 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java @@ -78,13 +78,4 @@ public int getPayloadSize() { return transactions.size() + spans.size(); } - @Override - public void recycle() { - for (int i = 0; i < transactions.size(); i++) { - transactions.get(i).recycle(); - } - for (int i = 0; i < spans.size(); i++) { - spans.get(i).recycle(); - } - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index fe584a77be..8193b6c274 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -32,33 +32,40 @@ import javax.annotation.Nullable; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public abstract class AbstractSpan extends TraceContextHolder { private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class); protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1); protected final TraceContext traceContext; - // used to mark this span as expected to switch lifecycle-managing-thread, eg span created by one thread and ended by another - private volatile boolean isLifecycleManagingThreadSwitch; - /** * Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id') */ protected final StringBuilder name = new StringBuilder(); private long timestamp; + /** * How long the transaction took to complete, in ms with 3 decimal points * (Required) */ protected double duration; + protected AtomicInteger references = new AtomicInteger(); + protected volatile boolean finished = true; - private volatile boolean finished = true; + public int getReferenceCount() { + return references.get(); + } public AbstractSpan(ElasticApmTracer tracer) { super(tracer); traceContext = TraceContext.with64BitId(this.tracer); } + public boolean isReferenced() { + return references.get() > 0; + } + /** * How long the transaction took to complete, in ms with 3 decimal points * (Required) @@ -116,8 +123,8 @@ public void resetState() { name.setLength(0); timestamp = 0; duration = 0; - isLifecycleManagingThreadSwitch = false; traceContext.resetState(); + references.set(0); } public boolean isChildOf(AbstractSpan parent) { @@ -129,6 +136,7 @@ public Span createSpan() { return createSpan(traceContext.getClock().getEpochMicros()); } + @Override public Span createSpan(long epochMicros) { return tracer.startSpan(this, epochMicros); } @@ -153,8 +161,14 @@ public void addLabel(String key, Boolean value) { public abstract AbstractContext getContext(); - protected void onStart() { + /** + * Called after the span has been started and its parent references are set + */ + protected void onAfterStart() { this.finished = false; + // this final reference is decremented when the span is reported + // or even after its reported and the last child span is ended + incrementReferences(); } public void end() { @@ -163,12 +177,13 @@ public void end() { public final void end(long epochMicros) { if (!finished) { - this.finished = true; - this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS; + this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS; if (name.length() == 0) { name.append("unnamed"); } doEnd(epochMicros); + // has to be set last so doEnd callbacks don't think it has already been finished + this.finished = true; } else { logger.warn("End has already been called: {}", this); assert false; @@ -182,20 +197,19 @@ public boolean isChildOf(TraceContextHolder other) { return getTraceContext().isChildOf(other); } - public void markLifecycleManagingThreadSwitchExpected() { - isLifecycleManagingThreadSwitch = true; + @Override + public T activate() { + incrementReferences(); + return super.activate(); } @Override - public T activate() { - if (isLifecycleManagingThreadSwitch) { - // This serves two goals: - // 1. resets the lifecycle management flag, so that the executing thread will remain in charge until set otherwise - // by setting this flag once more - // 2. reading this volatile field when span is activated on a new thread ensures proper visibility of other span data - isLifecycleManagingThreadSwitch = false; + public T deactivate() { + try { + return super.deactivate(); + } finally { + decrementReferences(); } - return super.activate(); } /** @@ -208,11 +222,7 @@ public T activate() { */ @Override public Runnable withActive(Runnable runnable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapRunnable(runnable, this); - } else { - return tracer.wrapRunnable(runnable, traceContext); - } + return tracer.wrapRunnable(runnable, this); } /** @@ -225,15 +235,32 @@ public Runnable withActive(Runnable runnable) { */ @Override public Callable withActive(Callable callable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapCallable(callable, this); - } else { - return tracer.wrapCallable(callable, traceContext); - } + return tracer.wrapCallable(callable, this); } public void setStartTimestamp(long epochMicros) { timestamp = epochMicros; } + public void incrementReferences() { + references.incrementAndGet(); + if (logger.isDebugEnabled()) { + logger.debug("increment references to {} ({})", this, references); + if (logger.isTraceEnabled()) { + logger.trace("incrementing references at", + new RuntimeException("This is an expected exception. Is just used to record where the reference count has been incremented.")); + } + } + } + + public void decrementReferences() { + if (logger.isDebugEnabled()) { + logger.debug("decrement references to {} ({})", this, references); + if (logger.isTraceEnabled()) { + logger.trace("decrementing references at", + new RuntimeException("This is an expected exception. Is just used to record where the reference count has been decremented.")); + } + } + } + } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java index 1621ec5bb7..91eca8a276 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java @@ -63,14 +63,27 @@ public class Span extends AbstractSpan implements Recyclable { private final SpanContext context = new SpanContext(); @Nullable private Throwable stacktrace; + @Nullable + private AbstractSpan parent; + @Nullable + private Transaction transaction; public Span(ElasticApmTracer tracer) { super(tracer); } public Span start(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros, boolean dropped) { - onStart(); childContextCreator.asChildOf(traceContext, parentContext); + if (parentContext instanceof Transaction) { + this.transaction = (Transaction) parentContext; + this.parent = this.transaction; + this.parent.incrementReferences(); + } else if (parentContext instanceof Span) { + final Span parentSpan = (Span) parentContext; + this.parent = parentSpan; + this.transaction = parentSpan.transaction; + this.parent.incrementReferences(); + } if (dropped) { traceContext.setRecorded(false); } @@ -86,6 +99,7 @@ public Span start(TraceContext.ChildContextCreator childContextCreator, T new RuntimeException("this exception is just used to record where the span has been started from")); } } + onAfterStart(); return this; } @@ -184,6 +198,9 @@ public void doEnd(long epochMicros) { if (type == null) { type = "custom"; } + if (parent != null) { + parent.decrementReferences(); + } this.tracer.endSpan(this); } @@ -195,19 +212,37 @@ public void resetState() { type = null; subtype = null; action = null; - } - - public void recycle() { - tracer.recycle(this); + parent = null; + transaction = null; } @Override public String toString() { - return String.format("'%s' %s", name, traceContext); + return String.format("'%s' %s (%s)", name, traceContext, Integer.toHexString(System.identityHashCode(this))); } public Span withStacktrace(Throwable stacktrace) { this.stacktrace = stacktrace; return this; } + + @Override + public void incrementReferences() { + if (transaction != null) { + transaction.incrementReferences(); + } + super.incrementReferences(); + } + + @Override + public void decrementReferences() { + if (transaction != null) { + transaction.decrementReferences(); + } + final int referenceCount = references.decrementAndGet(); + super.decrementReferences(); + if (referenceCount == 0) { + tracer.recycle(this); + } + } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java index a62ee2a63d..a8ba7aa366 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java @@ -399,6 +399,11 @@ public Span createSpan() { return tracer.startSpan(fromParent(), this); } + @Override + public Span createSpan(long epochMicros) { + return tracer.startSpan(fromParent(), this, epochMicros); + } + public interface ChildContextCreator { boolean asChildOf(TraceContext child, T parent); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java index c3e6df359e..390be5ddd4 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java @@ -75,6 +75,8 @@ public TraceContextHolder asExit() { public abstract Span createSpan(); + public abstract Span createSpan(long epochMicros); + /** * Creates a child Span representing a remote call event, unless this TraceContextHolder already represents an exit event. * If current TraceContextHolder is representing an Exit- returns null diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index 2774b77a4d..0f8545122d 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -27,6 +27,8 @@ import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.context.TransactionContext; import co.elastic.apm.agent.impl.sampling.Sampler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -35,6 +37,8 @@ */ public class Transaction extends AbstractSpan { + private static final Logger logger = LoggerFactory.getLogger(Transaction.class); + public static final String TYPE_REQUEST = "request"; /** @@ -68,7 +72,6 @@ public Transaction(ElasticApmTracer tracer) { } public Transaction start(TraceContext.ChildContextCreator childContextCreator, @Nullable T parent, long epochMicros, Sampler sampler) { - onStart(); if (parent == null || !childContextCreator.asChildOf(traceContext, parent)) { traceContext.asRootSpan(sampler); } @@ -77,13 +80,14 @@ public Transaction start(TraceContext.ChildContextCreator childContextCre } else { setStartTimestamp(traceContext.getClock().getEpochMicros()); } + onAfterStart(); return this; } public Transaction startNoop() { - onStart(); this.name.append("noop"); this.noop = true; + onAfterStart(); return this; } @@ -171,10 +175,6 @@ public void resetState() { type = null; } - public void recycle() { - tracer.recycle(this); - } - public boolean isNoop() { return noop; } @@ -193,6 +193,19 @@ public String getType() { @Override public String toString() { - return String.format("'%s' %s", name, traceContext); + return String.format("'%s' %s (%s)", name, traceContext, Integer.toHexString(System.identityHashCode(this))); + } + + @Override + public void incrementReferences() { + super.incrementReferences(); + } + + public void decrementReferences() { + final int referenceCount = this.references.decrementAndGet(); + super.decrementReferences(); + if (referenceCount == 0) { + tracer.recycle(this); + } } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index fbfbeb89cd..94b181d926 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -120,7 +120,7 @@ public Thread newThread(Runnable r) { @Override public void report(Transaction transaction) { if (!tryAddEventToRingBuffer(transaction, TRANSACTION_EVENT_TRANSLATOR)) { - transaction.recycle(); + transaction.decrementReferences(); } if (syncReport) { waitForFlush(); @@ -130,7 +130,7 @@ public void report(Transaction transaction) { @Override public void report(Span span) { if (!tryAddEventToRingBuffer(span, SPAN_EVENT_TRANSLATOR)) { - span.recycle(); + span.decrementReferences(); } if (syncReport) { waitForFlush(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java index 38059c1908..aacac3d462 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java @@ -192,11 +192,11 @@ private void writeEvent(ReportingEvent event) { if (event.getTransaction() != null) { currentlyTransmitting++; payloadSerializer.serializeTransactionNdJson(event.getTransaction()); - event.getTransaction().recycle(); + event.getTransaction().decrementReferences(); } else if (event.getSpan() != null) { currentlyTransmitting++; payloadSerializer.serializeSpanNdJson(event.getSpan()); - event.getSpan().recycle(); + event.getSpan().decrementReferences(); } else if (event.getError() != null) { currentlyTransmitting++; payloadSerializer.serializeErrorNdJson(event.getError()); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java index d3275beaa2..990819a975 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java @@ -29,10 +29,13 @@ import co.elastic.apm.agent.impl.payload.PayloadUtils; import co.elastic.apm.agent.impl.payload.TransactionPayload; import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.MetricRegistry; +import co.elastic.apm.agent.report.IntakeV2ReportingEventHandler; import co.elastic.apm.agent.report.Reporter; +import co.elastic.apm.agent.report.ReportingEvent; import co.elastic.apm.agent.report.serialize.DslJsonSerializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -248,4 +251,23 @@ public void reset() { errors.clear(); spans.clear(); } + + /** + * Calls {@link AbstractSpan#decrementReferences()} for all reported transactions and spans to emulate the references being decremented + * after reporting to the APM Server. + * See {@link IntakeV2ReportingEventHandler#writeEvent(ReportingEvent)} + */ + public void decrementReferences() { + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + } + + public void assertRecycledAfterDecrementingReferences() { + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isFalse()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isFalse()); + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isTrue()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isTrue()); + } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java index 1bb462456b..215f0f73a5 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java @@ -361,7 +361,8 @@ void testStartSpanAfterTransactionHasEnded() { final Transaction transaction = tracerImpl.startTransaction(TraceContext.asRoot(), null, getClass().getClassLoader()); final TraceContext transactionTraceContext = transaction.getTraceContext().copy(); transaction.end(); - transaction.resetState(); + + reporter.assertRecycledAfterDecrementingReferences(); tracerImpl.activate(transactionTraceContext); try { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java index c44e357c76..18259c17a4 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java @@ -113,7 +113,6 @@ void testMissingDeactivation() { void testContextAndSpanRunnableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(transaction.withActive((Runnable) () -> assertThat(tracer.getActive()).isSameAs(transaction))).run(); transaction.deactivate(); @@ -126,7 +125,6 @@ void testContextAndSpanRunnableActivation() { void testContextAndSpanCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(transaction.withActive(() -> tracer.currentTransaction())).call()).isSameAs(transaction); } catch (Exception e) { @@ -144,7 +142,6 @@ void testSpanAndContextRunnableActivation() { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive((Runnable) () -> assertThat(tracer.currentTransaction()).isSameAs(transaction)); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(runnable).run(); transaction.deactivate(); @@ -157,7 +154,6 @@ void testSpanAndContextCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> tracer.currentTransaction()); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(callable).call()).isSameAs(transaction); } catch (Exception e) { @@ -172,7 +168,6 @@ void testSpanAndContextCallableActivation() { @Test void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); assertThat(tracer.currentTransaction()).isSameAs(transaction); @@ -185,7 +180,6 @@ void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { @Test void testContextAndSpanCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Future transactionFuture = Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); return tracer.currentTransaction(); @@ -201,9 +195,8 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive(() -> { assertThat(tracer.currentTransaction()).isSameAs(transaction); - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.getActive()).isSameAs(transaction); }); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(runnable)).get(); transaction.deactivate(); @@ -214,13 +207,31 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { void testSpanAndContextCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> { - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.currentTransaction()).isSameAs(transaction); return tracer.currentTransaction(); }); - transaction.markLifecycleManagingThreadSwitchExpected(); assertThat(Executors.newSingleThreadExecutor().submit(transaction.withActive(callable)).get()).isSameAs(transaction); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); } + + @Test + void testAsyncActivationAfterEnd() throws Exception { + final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); + Callable callable = transaction.withActive(() -> { + assertThat(tracer.getActive()).isSameAs(transaction); + return tracer.currentTransaction(); + }); + transaction.deactivate().end(); + reporter.decrementReferences(); + assertThat(transaction.isReferenced()).isTrue(); + + assertThat(Executors.newSingleThreadExecutor().submit(callable).get()).isSameAs(transaction); + assertThat(transaction.isReferenced()).isFalse(); + // recycled because the transaction is finished, reported and the reference counter is 0 + assertThat(transaction.getTraceContext().getTraceId().isEmpty()).isTrue(); + + assertThat(tracer.getActive()).isNull(); + } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java index 88beca78fb..95398d4753 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java @@ -88,8 +88,6 @@ private static void onBeforeExecute(@Advice.Argument(0) String method, span = helper.createClientSpan(method, endpoint, entity); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java index ae1d0ecc90..ac0a4a0655 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java @@ -80,8 +80,6 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, span = helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity()); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java index 3f423dd9f8..3de4ee6256 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java @@ -65,6 +65,11 @@ public abstract class ExecutorInstrumentation extends ElasticApmInstrumentation // so this pool only works when called directly from ManagedExecutorServiceImpl // excluding this class from instrumentation does not work as it inherits the execute and submit methods excludedClasses.add("org.glassfish.enterprise.concurrent.internal.ManagedThreadPoolExecutor"); + // Used in Tomcat 7 + // Especially the wrapping of org.apache.tomcat.util.net.AprEndpoint$SocketProcessor is problematic + // because that is the Runnable for the actual request processor thread. + // Wrapping that leaks transactions and spans to other requests. + excludedClasses.add("org.apache.tomcat.util.threads.ThreadPoolExecutor"); } @Override diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java index 919463d63b..82396f2999 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorServiceDoubleWrappingTest.java @@ -47,7 +47,6 @@ public class ExecutorServiceDoubleWrappingTest extends AbstractInstrumentationTe @Before public void setUp() { transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).withName("Transaction").activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); } @After diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java index f53784fdf6..acaa74d5fa 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java @@ -25,8 +25,8 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; -import co.elastic.apm.agent.impl.async.ContextInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.ContextInScopeRunnableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; import co.elastic.apm.agent.impl.transaction.TraceContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -52,7 +52,7 @@ void setUp() { executor = ExecutorServiceWrapper.wrap(new ForkJoinPool() { @Override public ForkJoinTask submit(Runnable task) { - if (task instanceof ContextInScopeRunnableWrapper) { + if (task instanceof SpanInScopeRunnableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new ClassCastException(); } @@ -61,7 +61,7 @@ public ForkJoinTask submit(Runnable task) { @Override public ForkJoinTask submit(Callable task) { - if (task instanceof ContextInScopeCallableWrapper) { + if (task instanceof SpanInScopeCallableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new IllegalArgumentException(); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java index 70a627b381..22c51dec5e 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java @@ -95,7 +95,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, okhttp3.Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().host()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java index 71563606a3..d488744634 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java @@ -94,7 +94,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().getHost()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java index bc6d6f6ddc..c39b9678d8 100644 --- a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java +++ b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java @@ -180,7 +180,6 @@ private static void onEnterAsyncContextStart(@Advice.Argument(value = 0, readOnl if (tracer != null && runnable != null && tracer.isWrappingAllowedOnThread()) { final Transaction transaction = tracer.currentTransaction(); if (transaction != null) { - transaction.markLifecycleManagingThreadSwitchExpected(); runnable = transaction.withActive(runnable); tracer.avoidWrappingOnThread(); } diff --git a/integration-tests/application-server-integration-tests/src/test/java/co/elastic/apm/servlet/AbstractServletContainerIntegrationTest.java b/integration-tests/application-server-integration-tests/src/test/java/co/elastic/apm/servlet/AbstractServletContainerIntegrationTest.java index 8f2ad2f8fa..8d94445aae 100644 --- a/integration-tests/application-server-integration-tests/src/test/java/co/elastic/apm/servlet/AbstractServletContainerIntegrationTest.java +++ b/integration-tests/application-server-integration-tests/src/test/java/co/elastic/apm/servlet/AbstractServletContainerIntegrationTest.java @@ -122,10 +122,10 @@ protected AbstractServletContainerIntegrationTest(GenericContainer servletCon this.expectedDefaultServiceName = expectedDefaultServiceName; servletContainer .withNetwork(Network.SHARED) - .withEnv("ELASTIC_APM_SERVER_URL", "http://apm-server:1080") + .withEnv("ELASTIC_APM_SERVER_URLS", "http://apm-server:1080") .withEnv("ELASTIC_APM_IGNORE_URLS", "/status*,/favicon.ico") .withEnv("ELASTIC_APM_REPORT_SYNC", "true") - .withEnv("ELASTIC_APM_LOGGING_LOG_LEVEL", "DEBUG") + .withEnv("ELASTIC_APM_LOG_LEVEL", "DEBUG") .withEnv("ELASTIC_APM_CAPTURE_BODY", "all") .withLogConsumer(new StandardOutLogConsumer().withPrefix(containerName)) .withExposedPorts(webPort)