Skip to content

Combine continuation implementations into one which supports multiple activations #8324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ public static <K> void closeScope(
if (throwable != null) {
// This might lead to the continuation being consumed early, but it's better to be safe if we
// threw an Exception on entry
state.closeContinuation();
state.cancelContinuation();
}
}

public static <K> void closeAndClearContinuation(
public static <K> void cancelAndClearContinuation(
ContextStore<K, ConcurrentState> contextStore, K key) {
final ConcurrentState state = contextStore.get(key);
if (state == null) {
return;
}
state.closeAndClearContinuation();
state.cancelAndClearContinuation();
}

private boolean captureAndSetContinuation(final AgentScope scope) {
if (CONTINUATION.compareAndSet(this, null, CLAIMED)) {
// lazy write is guaranteed to be seen by getAndSet
CONTINUATION.lazySet(this, scope.captureConcurrent());
CONTINUATION.lazySet(this, scope.capture().hold());
return true;
}
return false;
Expand All @@ -99,14 +99,14 @@ private AgentScope activateAndContinueContinuation() {
return null;
}

private void closeContinuation() {
private void cancelContinuation() {
final AgentScope.Continuation continuation = CONTINUATION.get(this);
if (continuation != null && continuation != CLAIMED) {
continuation.cancel();
}
}

private void closeAndClearContinuation() {
private void cancelAndClearContinuation() {
final AgentScope.Continuation continuation = CONTINUATION.get(this);
if (continuation != null && continuation != CLAIMED) {
// We should never be able to reuse this state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ final class ContinuationClaim implements AgentScope.Continuation {

public static final ContinuationClaim CLAIMED = new ContinuationClaim();

@Override
public AgentScope.Continuation hold() {
throw new IllegalStateException();
}

@Override
public AgentScope activate() {
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static void exit(
boolean claimed = !wasClaimed && !hadExecutor && zis.getForkJoinTaskTag() == 1;
if (mode == ASYNC || (mode < ASYNC && claimed) || !zis.isLive()) {
contextStore = InstrumentationContext.get(UniCompletion.class, ConcurrentState.class);
ConcurrentState.closeAndClearContinuation(contextStore, zis);
ConcurrentState.cancelAndClearContinuation(contextStore, zis);
}
if (scope != null || throwable != null) {
if (contextStore == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void maybeInitialize() {
if (!isInitialized) {
final AgentScope activeScope = AgentTracer.get().activeScope();
if (activeScope != null && activeScope.isAsyncPropagating()) {
continuation = activeScope.captureConcurrent();
continuation = activeScope.capture().hold();
}
isInitialized = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ public Continuation capture() {
return delegate.capture();
}

@Override
public Continuation captureConcurrent() {
return delegate.captureConcurrent();
}

@Override
public void close() {
delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ public Continuation capture() {
return delegate.capture();
}

@Override
public Continuation captureConcurrent() {
return delegate.captureConcurrent();
}

public boolean isFinishSpanOnClose() {
return finishSpanOnClose;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ public Continuation capture() {
return delegate.capture();
}

@Override
public Continuation captureConcurrent() {
return delegate.captureConcurrent();
}

public boolean isFinishSpanOnClose() {
return finishSpanOnClose;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ public static class NoopContinuation implements Continuation {

private NoopContinuation() {}

@Override
public Continuation hold() {
return this;
}

@Override
public TraceScope activate() {
return NoopTraceScope.INSTANCE;
Expand All @@ -24,11 +29,6 @@ public Continuation capture() {
return NoopContinuation.INSTANCE;
}

@Override
public Continuation captureConcurrent() {
return null;
}

@Override
public void close() {}
}
26 changes: 14 additions & 12 deletions dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,11 @@ public interface TraceScope extends Closeable {
*/
Continuation capture();

/**
* Prevent the trace attached to this TraceScope from reporting until the returned Continuation is
* either activated (and the returned scope is closed), or canceled.
*
* <p>Should be called on the parent thread.
*
* <p>If the returned {@link Continuation} is activated, it needs to be canceled in addition to
* the returned {@link TraceScope} being closed. This is to allow multiple concurrent threads that
* activate the continuation to race in a safe way, and close the scopes without fear of closing
* the related {@code Span} prematurely.
*/
Continuation captureConcurrent();
/** @deprecated Replaced by {@code capture().hold()}. */
@Deprecated
default Continuation captureConcurrent() {
return capture().hold();
}

/** Close the activated context and allow any underlying spans to finish. */
@Override
Expand Down Expand Up @@ -61,6 +54,15 @@ default void setAsyncPropagation(boolean value) {
*/
interface Continuation {

/**
* Prevent the trace attached to this scope from reporting until the continuation is explicitly
* cancelled. You must call {@link #cancel()} at some point to avoid discarding traces.
*
* <p>Use this when you want to let multiple threads activate the continuation concurrently and
* close their scopes without fear of prematurely closing the related span.
*/
Continuation hold();

/**
* Activate the continuation.
*
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,9 @@ public final void setAsyncPropagation(final boolean value) {
* @return The new continuation, or null if this scope is not async propagating.
*/
@Override
public final AbstractContinuation capture() {
public final ScopeContinuation capture() {
return isAsyncPropagating
? new SingleContinuation(scopeManager, span, source()).register()
: null;
}

/**
* The continuation returned must be closed or activated or the trace will not finish.
*
* @return The new continuation, or null if this scope is not async propagating.
*/
@Override
public final AbstractContinuation captureConcurrent() {
return isAsyncPropagating
? new ConcurrentContinuation(scopeManager, span, source()).register()
? new ScopeContinuation(scopeManager, span, source()).register()
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public AgentScope activate(
}

public AgentScope.Continuation captureSpan(final AgentSpan span) {
AbstractContinuation continuation =
new SingleContinuation(this, span, ScopeSource.INSTRUMENTATION.id());
ScopeContinuation continuation =
new ScopeContinuation(this, span, ScopeSource.INSTRUMENTATION.id());
continuation.register();
healthMetrics.onCaptureContinuation();
return continuation;
Expand Down Expand Up @@ -136,12 +136,12 @@ private AgentScope activate(
}

/**
* Activates a scope for the given {@link AbstractContinuation}.
* Activates a scope for the given {@link ScopeContinuation}.
*
* @param continuation {@code null} if a continuation is re-used
*/
ContinuableScope continueSpan(
final AbstractContinuation continuation, final AgentSpan span, final byte source) {
final ScopeContinuation continuation, final AgentSpan span, final byte source) {
ScopeStack scopeStack = scopeStack();

// optimization: if the top scope is already keeping the same span alive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

final class ContinuingScope extends ContinuableScope {
/** Continuation that created this scope. */
private final AbstractContinuation continuation;
private final ScopeContinuation continuation;

ContinuingScope(
final ContinuableScopeManager scopeManager,
final AgentSpan span,
final byte source,
final boolean isAsyncPropagating,
final AbstractContinuation continuation,
final ScopeContinuation continuation,
final Stateful scopeState) {
super(scopeManager, span, source, isAsyncPropagating, scopeState);
this.continuation = continuation;
Expand All @@ -21,7 +21,6 @@ final class ContinuingScope extends ContinuableScope {
@Override
void cleanup(final ScopeStack scopeStack) {
super.cleanup(scopeStack);

continuation.cancelFromContinuedScopeClose();
}
}
Loading
Loading