Skip to content

feat(botocore): patch stepfunctions #7514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4498781
[SVLS-4126] Patch botocore stepfunctions
Nov 7, 2023
476f552
Inject trace context into step function input object
Nov 13, 2023
3c3f391
Add releasenote
Nov 13, 2023
b65ca3e
Update releasenotes/notes/inject-botocore-stepfunction-start_executio…
agocs Nov 27, 2023
1c8b409
Merge branch '2.x' into chris.agocs/patch_botocore_stepfunctions
agocs Nov 27, 2023
9e3c4b0
ruff
Nov 27, 2023
b9932c0
Fix statemachine arns
Nov 30, 2023
c12cfa4
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
ZStriker19 Dec 1, 2023
2a9408b
Create and delete the state machines
Dec 1, 2023
5701c9c
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Dec 4, 2023
d46ff52
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
emmettbutler Dec 5, 2023
b521496
Fix some details about how Patch works and get tests working as well
Dec 6, 2023
907c2ba
merge commit
Dec 6, 2023
a6e9fd5
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Dec 6, 2023
38705cb
Add more tests for inject_trace_to_stepfunction_input
Dec 11, 2023
facabb7
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Dec 11, 2023
788c4f0
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Dec 11, 2023
5741901
Merge main and fix merge conflicts; move patch stepfunctions into its…
Jan 11, 2024
4b05375
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Jan 11, 2024
186bf7f
run the linter
Jan 11, 2024
a962c22
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,46 @@ def inject_trace_to_sqs_or_sns_message(params, span, endpoint_service=None, pin=
inject_trace_data_to_message_attributes(trace_data, params, endpoint_service)


def inject_trace_to_stepfunction_input(params, span):
# type: (Any, Span) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated

Inject the trace headers into the StepFunction input if the input is a JSON string
"""
if "input" not in params:
log.warning("Unable to inject context. The StepFunction input had no input.")
return

if params["input"] is None:
log.warning("Unable to inject context. The StepFunction input was None.")
return

if isinstance(params["input"], dict):
if "_datadog" in params["input"]:
log.warning("Input already has trace context.")
return
params["input"]["_datadog"] = {}
HTTPPropagator.inject(span.context, params["input"]["_datadog"])
return

if isinstance(params["input"], str):
try:
input_obj = json.loads(params["input"])
except ValueError:
log.warning("Input is not a valid JSON string")
return

input_obj["_datadog"] = {}
HTTPPropagator.inject(span.context, input_obj["_datadog"])
input_json = json.dumps(input_obj)

params["input"] = input_json

log.warning("Unable to inject context. The StepFunction input was not a dict or a JSON string.")


def inject_trace_to_eventbridge_detail(params, span):
# type: (Any, Span) -> None
"""
Expand Down Expand Up @@ -574,6 +614,16 @@ def patched_api_call(original_func, instance, args, kwargs):
span.name = schematize_cloud_messaging_operation(
trace_operation, cloud_provider="aws", cloud_service="sns", direction=SpanDirection.OUTBOUND
)
if endpoint_name == "stepfunctions" and (
operation == "start-execution" or operation == "start-sync-execution"
):
inject_trace_to_stepfunction_input(params, span)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="stepfunctions",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
botocore: Add the ability to inject trace context into the input field of botocore stepfunction start_execution and
start_sync_execution calls.
36 changes: 36 additions & 0 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from moto import mock_s3
from moto import mock_sns
from moto import mock_sqs
from moto import mock_stepfunctions
import pytest

from tests.utils import get_128_bit_trace_id_from_headers
Expand All @@ -42,6 +43,7 @@
from ddtrace.internal.schema import DEFAULT_SPAN_SERVICE_NAME
from ddtrace.internal.utils.version import parse_version
from ddtrace.propagation.http import HTTP_HEADER_PARENT_ID
from ddtrace.propagation.http import HTTP_HEADER_TRACE_ID
from tests.opentracer.utils import init_tracer
from tests.utils import TracerTestCase
from tests.utils import assert_is_measured
Expand Down Expand Up @@ -885,6 +887,40 @@ def test_schematized_unspecified_service_sqs_client_v1(self):
assert spans[2].service == DEFAULT_SPAN_SERVICE_NAME
assert spans[2].name == "aws.sqs.receive"

@mock_stepfunctions
def test_stepfunctions_client(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2")
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:425362996713:stateMachine:foo", name="bar", input='{"baz":1}'
)
spans = self.get_spans()
assert spans
span = spans[0]
assert len(spans) == 1
assert span.get_tag("aws.region") == "us-west-2"
assert span.get_tag("region") == "us-west-2"
assert span.get_tag("aws.operation") == "StartExecution"
assert span.get_tag("component") == "botocore"
assert span.get_tag("span.kind"), "client"
assert_is_measured(span)
assert_span_http_status_code(span, 200)
assert span.service == "test-botocore-tracing.stepfunctions"
assert span.resource == "stepfunctions.startexecution"
assert span.get_tag("params.input") == '{"baz":1}'

def test_stepfunctions_send_start_execution_trace_injection(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2")
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:425362996713:stateMachine:foo", name="bar", input='{"baz":1}'
)
spans = self.get_spans()
assert spans
span = spans[0]
input_obj = json.loads(span.get_tag("params.input"))
assert input_obj["_datadog"][HTTP_HEADER_TRACE_ID] == str(span.trace_id)

def _test_kinesis_client(self):
client = self.session.create_client("kinesis", region_name="us-east-1")
stream_name = "test"
Expand Down