Skip to content

Inject trace context into AWS Step Functions input #7585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f8721d
inject step functions trace context
DylanLovesCoffee Sep 5, 2024
a974597
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Sep 9, 2024
7ecfa8c
remove previous logic
DylanLovesCoffee Sep 13, 2024
fa36832
aws sfn instrumentation
DylanLovesCoffee Sep 16, 2024
22c652b
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Sep 16, 2024
eb642b0
muzzle
DylanLovesCoffee Sep 16, 2024
0778f3d
single char
DylanLovesCoffee Sep 17, 2024
f9d0eea
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 8, 2024
421856b
update muzzle
DylanLovesCoffee Oct 8, 2024
895341a
refactor input attr injector
DylanLovesCoffee Oct 8, 2024
9d76dec
refactor interceptor
DylanLovesCoffee Oct 9, 2024
aba1119
cleanup test
DylanLovesCoffee Oct 9, 2024
e56f2d3
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 23, 2024
3039780
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 23, 2024
370df6d
use JsonBuffer
DylanLovesCoffee Oct 24, 2024
a644a8b
catch exceptions
DylanLovesCoffee Oct 24, 2024
fc6e570
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Dec 3, 2024
ee83940
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 6, 2025
973cd79
add build.gradle comment
nhulston Feb 6, 2025
96f7f39
fix SfnClientInstrumentation class header
nhulston Feb 6, 2025
fdb01a2
update InputAttributeInjector to use `datadog.json` component https:/…
nhulston Feb 6, 2025
8417f6e
fix injection
nhulston Feb 6, 2025
0aac239
test edge cases
nhulston Feb 6, 2025
2992508
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 7, 2025
da7af0b
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 10, 2025
ca13195
improve `getModifiedInput`
nhulston Feb 11, 2025
2d4507d
test that tracer does not error on invalid json input
nhulston Feb 11, 2025
29a651c
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 11, 2025
a434846
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 12, 2025
1197c1d
don't throw, just return request
nhulston Feb 12, 2025
0f304c7
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions dd-java-agent/instrumentation/aws-java-sfn-2.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
muzzle {
pass {
group = "software.amazon.awssdk"
module = "sfn"
versions = "[2.15.35,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2')
testImplementation 'software.amazon.awssdk:sfn:2.27.2'
testImplementation 'org.testcontainers:localstack:1.19.7'

latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+'
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

public class InputAttributeInjector {
public static String buildTraceContext(AgentSpan span) {
// Extract span tags
StringBuilder spanTagsJSON = new StringBuilder();
spanTagsJSON.append('{');
span.getTags()
.forEach(
(tagKey, tagValue) ->
spanTagsJSON
.append("\"")
.append(tagKey)
.append("\":\"")
.append(tagValue)
.append("\","));
spanTagsJSON.setLength(spanTagsJSON.length() - 1); // remove trailing comma
spanTagsJSON.append('}');

// Build DD trace context object
String ddTraceContextJSON =
String.format(
"\"_datadog\": { \"x-datadog-trace-id\": \"%s\",\"x-datadog-parent-id\":\"%s\", \"x-datadog-tags\": %s }",
span.getTraceId().toString(), span.getSpanId(), spanTagsJSON);

return ddTraceContextJSON;
}

public static StringBuilder getModifiedInput(String request, String ddTraceContextJSON) {
StringBuilder modifiedInput = new StringBuilder(request);
int startPos = modifiedInput.indexOf("{");
int endPos = modifiedInput.lastIndexOf("}");
String inputContent = modifiedInput.substring(startPos + 1, endPos);
if (inputContent.isEmpty()) {
modifiedInput.insert(endPos, ddTraceContextJSON);
} else {
modifiedInput.insert(
endPos, String.format(", %s", ddTraceContextJSON)); // prepend comma to existing input
}
return modifiedInput;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import java.util.List;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

/** AWS SDK v2 Step Function instrumentation */
@AutoService(InstrumenterModule.class)
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public SfnClientInstrumentation() {
super("sfn", "aws-sdk");
}

@Override
public String instrumentedType() {
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("resolveExecutionInterceptors")),
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"};
}

public static class AwsSfnBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof SfnInterceptor) {
return;
}
}
interceptors.add(new SfnInterceptor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest;
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest;

public class SfnInterceptor implements ExecutionInterceptor {

public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
InstanceStore.of(ExecutionAttribute.class)
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));

public SfnInterceptor() {}

@Override
public SdkRequest modifyRequest(
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
// StartExecutionRequest
if (context.request() instanceof StartExecutionRequest) {
StartExecutionRequest request = (StartExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the Step Function input
StringBuilder modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput.toString()).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be deduplicated using a dedicated method:

SdkRequest injectTraceContext(request, span) {
      String traceContext = InputAttributeInjector.buildTraceContext(span);
      // Inject the trace context into the Step Function input
      String modifiedInput = InputAttributeInjector.getModifiedInput(request.input(), traceContext);
      return request.toBuilder().input(modifiedInput).build()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I had to handle both StartExecutionRequest and StartSyncExecutionRequest param types I ended up doing method overloading for injectTraceContext()

}

// StartSyncExecutionRequest
if (context.request() instanceof StartSyncExecutionRequest) {
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the Step Function input
StringBuilder modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput.toString()).build();
}

return context.request();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.services.sfn.SfnClient
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import spock.lang.Shared

import java.time.Duration

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan


abstract class SfnClientTest extends VersionedNamingTestBase {
static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack"))
.withExposedPorts(4566)
.withEnv("SERVICES", "stepfunctions")
.withReuse(true)
.withStartupTimeout(Duration.ofSeconds(120))

@Shared SfnClient sfnClient

@Shared String testStateMachineARN

def setupSpec() {
LOCALSTACK.start()
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566)
sfnClient = SfnClient.builder()
.endpointOverride(URI.create(endPoint))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
.build()

def response = sfnClient.createStateMachine { builder ->
builder.name("testStateMachine")
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}")
.build()
}
testStateMachineARN = response.stateMachineArn()
}

def cleanupSpec() {
LOCALSTACK.stop()
}

def "Step Functions span is created"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566)


then:
assertTraces(1) {
trace(2) {
basicSpan(it, "parent")
span {
serviceName "java-aws-sdk"
operationName "aws.http"
resourceName "Sfn.StartExecution"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" endPoint+'/'
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566)
"$Tags.PEER_HOSTNAME" LOCALSTACK.getHost()
"aws.service" "Sfn"
"aws.operation" "StartExecution"
"aws.agent" "java-aws-sdk"
"aws.requestId" response.responseMetadata().requestId()
"aws_service" "Sfn"
defaultTags()
}
}
}
}
}

def "Trace context is injected to Step Functions input"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

then:
def execution = sfnClient.describeExecution { builder ->
builder.executionArn(response.executionArn())
.build()
}
def input = new JsonSlurper().parseText(execution.input())
input["key"] == "value"
input["_datadog"]["x-datadog-trace-id"] != null
input["_datadog"]["x-datadog-parent-id"] != null
input["_datadog"]["x-datadog-tags"] != null
}
}

class SfnClientV0Test extends SfnClientTest {
@Override
int version() {
0
}

@Override
String service() {
return null
}

@Override
String operation() {
return null
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ include ':dd-java-agent:instrumentation:aws-java-sns-1.0'
include ':dd-java-agent:instrumentation:aws-java-sns-2.0'
include ':dd-java-agent:instrumentation:aws-java-sqs-1.0'
include ':dd-java-agent:instrumentation:aws-java-sqs-2.0'
include ':dd-java-agent:instrumentation:aws-java-sfn-2.0'
include ':dd-java-agent:instrumentation:aws-lambda-handler'
include ':dd-java-agent:instrumentation:axis-2'
include ':dd-java-agent:instrumentation:axway-api'
Expand Down