Skip to content

Inject trace context into AWS Step Functions input #7585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f8721d
inject step functions trace context
DylanLovesCoffee Sep 5, 2024
a974597
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Sep 9, 2024
7ecfa8c
remove previous logic
DylanLovesCoffee Sep 13, 2024
fa36832
aws sfn instrumentation
DylanLovesCoffee Sep 16, 2024
22c652b
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Sep 16, 2024
eb642b0
muzzle
DylanLovesCoffee Sep 16, 2024
0778f3d
single char
DylanLovesCoffee Sep 17, 2024
f9d0eea
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 8, 2024
421856b
update muzzle
DylanLovesCoffee Oct 8, 2024
895341a
refactor input attr injector
DylanLovesCoffee Oct 8, 2024
9d76dec
refactor interceptor
DylanLovesCoffee Oct 9, 2024
aba1119
cleanup test
DylanLovesCoffee Oct 9, 2024
e56f2d3
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 23, 2024
3039780
Merge branch 'master' into dylan/sfn-trace-ctx
DylanLovesCoffee Oct 23, 2024
370df6d
use JsonBuffer
DylanLovesCoffee Oct 24, 2024
a644a8b
catch exceptions
DylanLovesCoffee Oct 24, 2024
fc6e570
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Dec 3, 2024
ee83940
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 6, 2025
973cd79
add build.gradle comment
nhulston Feb 6, 2025
96f7f39
fix SfnClientInstrumentation class header
nhulston Feb 6, 2025
fdb01a2
update InputAttributeInjector to use `datadog.json` component https:/…
nhulston Feb 6, 2025
8417f6e
fix injection
nhulston Feb 6, 2025
0aac239
test edge cases
nhulston Feb 6, 2025
2992508
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 7, 2025
da7af0b
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 10, 2025
ca13195
improve `getModifiedInput`
nhulston Feb 11, 2025
2d4507d
test that tracer does not error on invalid json input
nhulston Feb 11, 2025
29a651c
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 11, 2025
a434846
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 12, 2025
1197c1d
don't throw, just return request
nhulston Feb 12, 2025
0f304c7
Merge branch 'master' into dylan/sfn-trace-ctx
nhulston Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dd-java-agent/instrumentation/aws-java-sfn-2.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
muzzle {
pass {
group = "software.amazon.awssdk"
module = "sfn"
// 2.15.35 is the minimum version with step functions
versions = "[2.15.35,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2')
testImplementation 'software.amazon.awssdk:sfn:2.15.35'
testImplementation libs.testcontainers

latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+'
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.json.JsonMapper;
import datadog.json.JsonWriter;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

public class InputAttributeInjector {
private static final String DATADOG_KEY = "_datadog";

public static String buildTraceContext(AgentSpan span) {
String tagsJson = JsonMapper.toJson(span.getTags());
try (JsonWriter writer = new JsonWriter()) {
writer.beginObject();
writer.name("x-datadog-trace-id").value(span.getTraceId().toString());
writer.name("x-datadog-parent-id").value(String.valueOf(span.getSpanId()));
writer.name("x-datadog-tags").jsonValue(tagsJson);
writer.endObject();
return writer.toString();
} catch (Exception e) {
return null;
}
}

public static String getModifiedInput(String request, String ddTraceContextJSON) {
if (request == null || ddTraceContextJSON == null) {
return request; // leave request unmodified
}

final String traceContextProperty = "\"" + DATADOG_KEY + "\":" + ddTraceContextJSON;
int startPos = request.indexOf('{');
int endPos = request.lastIndexOf('}');

if (startPos < 0 || endPos < startPos) {
return request; // leave request unmodified
}

// If input is an empty {}
if (endPos == startPos + 1) {
return "{" + traceContextProperty + "}";
}

String existingJSON = request.substring(startPos + 1, endPos).trim();
if (existingJSON.isEmpty()) {
return "{" + traceContextProperty + "}";
} else {
return "{" + existingJSON + "," + traceContextProperty + "}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import java.util.List;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

/** AWS SDK v2 Step Function instrumentation */
@AutoService(InstrumenterModule.class)
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public SfnClientInstrumentation() {
super("sfn", "aws-sdk");
}

@Override
public String instrumentedType() {
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("resolveExecutionInterceptors")),
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"};
}

public static class AwsSfnBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof SfnInterceptor) {
return;
}
}
interceptors.add(new SfnInterceptor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.instrumentation.aws.v2.sfn;

import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest;
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest;

public class SfnInterceptor implements ExecutionInterceptor {

public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
InstanceStore.of(ExecutionAttribute.class)
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));

public SfnInterceptor() {}

@Override
public SdkRequest modifyRequest(
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
try {
return modifyRequestImpl(context, executionAttributes);
} catch (Exception e) {
return context.request();
}
}

public SdkRequest modifyRequestImpl(
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
// StartExecutionRequest
if (context.request() instanceof StartExecutionRequest) {
StartExecutionRequest request = (StartExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
return injectTraceContext(span, request);
}

// StartSyncExecutionRequest
if (context.request() instanceof StartSyncExecutionRequest) {
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request();
if (request.input() == null) {
return request;
}
return injectTraceContext(span, request);
}

return context.request();
}

private SdkRequest injectTraceContext(AgentSpan span, StartExecutionRequest request) {
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the StartExecutionRequest input
String modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput).build();
}

private SdkRequest injectTraceContext(AgentSpan span, StartSyncExecutionRequest request) {
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the StartSyncExecutionRequest input
String modifiedInput =
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);

return request.toBuilder().input(modifiedInput).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.services.sfn.SfnClient
import software.amazon.awssdk.services.sfn.model.SfnException
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import spock.lang.Shared

import java.time.Duration

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan


abstract class SfnClientTest extends VersionedNamingTestBase {
@Shared GenericContainer localStack
@Shared SfnClient sfnClient
@Shared String testStateMachineARN
@Shared Object endPoint

def setupSpec() {
localStack = new GenericContainer(DockerImageName.parse("localstack/localstack"))
.withExposedPorts(4566)
.withEnv("SERVICES", "stepfunctions")
.withReuse(true)
.withStartupTimeout(Duration.ofSeconds(120))
localStack.start()
endPoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566)
sfnClient = SfnClient.builder()
.endpointOverride(URI.create(endPoint))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
.build()

def response = sfnClient.createStateMachine { builder ->
builder.name("testStateMachine")
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}")
.build()
}
testStateMachineARN = response.stateMachineArn()
}

def cleanupSpec() {
sfnClient.close()
localStack.stop()
}

def "Step Functions span is created"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

then:
assertTraces(1) {
trace(2) {
basicSpan(it, "parent")
span {
serviceName service()
operationName operation()
resourceName "Sfn.StartExecution"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" endPoint+'/'
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" localStack.getMappedPort(4566)
"$Tags.PEER_HOSTNAME" localStack.getHost()
"aws.service" "Sfn"
"aws.operation" "StartExecution"
"aws.agent" "java-aws-sdk"
"aws.requestId" response.responseMetadata().requestId()
"aws_service" "Sfn"
defaultTags()
}
}
}
}
}

def "Trace context is injected to Step Functions input"() {
when:
StartExecutionResponse response
TraceUtils.runUnderTrace('parent', {
response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}
})

then:
def execution = sfnClient.describeExecution { builder ->
builder.executionArn(response.executionArn())
.build()
}
def input = new JsonSlurper().parseText(execution.input())
input["key"] == "value"
input["_datadog"]["x-datadog-trace-id"] != null
input["_datadog"]["x-datadog-parent-id"] != null
input["_datadog"]["x-datadog-tags"] != null
}

def "AWS rejects invalid JSON but instrumentation does not error"() {
when:
sfnClient.startExecution { b ->
b.stateMachineArn(testStateMachineARN)
.input("hello") // invalid JSON
.build()
}

then:
thrown(SfnException)
}

def "Doesn't cause error for Step Functions input edge cases"() {
def inputs = [
'''{}''',
'''{ }''',
''' { } ''',
'''{"foo": "bar"}''',
''' { "foo" : "bar" } ''',
'''{"key1": "val1", "key2": "val2"}''',
''' { "key1" : "val1" , "key2" : "val2" } '''
]

when:
inputs.forEach { input ->
TraceUtils.runUnderTrace('parent', {
sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input(input)
.build()
}
})
}

then:
noExceptionThrown()
}
}

class SfnClientV0Test extends SfnClientTest {
@Override
int version() {
0
}

@Override
String service() {
return "java-aws-sdk"
}

@Override
String operation() {
return "aws.http"
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ include ':dd-java-agent:instrumentation:aws-common'
include ':dd-java-agent:instrumentation:aws-java-eventbridge-2.0'
include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0'
include ':dd-java-agent:instrumentation:aws-java-sdk-2.2'
include ':dd-java-agent:instrumentation:aws-java-sfn-2.0'
include ':dd-java-agent:instrumentation:aws-java-sns-1.0'
include ':dd-java-agent:instrumentation:aws-java-sns-2.0'
include ':dd-java-agent:instrumentation:aws-java-sqs-1.0'
Expand Down
Loading