Skip to content

feat(botocore): patch stepfunctions #7514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4498781
[SVLS-4126] Patch botocore stepfunctions
Nov 7, 2023
476f552
Inject trace context into step function input object
Nov 13, 2023
3c3f391
Add releasenote
Nov 13, 2023
b65ca3e
Update releasenotes/notes/inject-botocore-stepfunction-start_executio…
agocs Nov 27, 2023
1c8b409
Merge branch '2.x' into chris.agocs/patch_botocore_stepfunctions
agocs Nov 27, 2023
9e3c4b0
ruff
Nov 27, 2023
b9932c0
Fix statemachine arns
Nov 30, 2023
c12cfa4
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
ZStriker19 Dec 1, 2023
2a9408b
Create and delete the state machines
Dec 1, 2023
5701c9c
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Dec 4, 2023
d46ff52
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
emmettbutler Dec 5, 2023
b521496
Fix some details about how Patch works and get tests working as well
Dec 6, 2023
907c2ba
merge commit
Dec 6, 2023
a6e9fd5
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Dec 6, 2023
38705cb
Add more tests for inject_trace_to_stepfunction_input
Dec 11, 2023
facabb7
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Dec 11, 2023
788c4f0
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Dec 11, 2023
5741901
Merge main and fix merge conflicts; move patch stepfunctions into its…
Jan 11, 2024
4b05375
Merge branch 'chris.agocs/patch_botocore_stepfunctions' of github.com…
Jan 11, 2024
186bf7f
run the linter
Jan 11, 2024
a962c22
Merge branch 'main' into chris.agocs/patch_botocore_stepfunctions
agocs Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from .services.sqs import inject_trace_to_sqs_or_sns_batch_message
from .services.sqs import inject_trace_to_sqs_or_sns_message
from .services.sqs import patched_sqs_api_call
from .services.stepfunctions import inject_trace_to_stepfunction_input
from .services.stepfunctions import patched_stepfunction_api_call
from .utils import inject_trace_to_client_context
from .utils import inject_trace_to_eventbridge_detail
from .utils import set_patched_api_call_span_tags
Expand Down Expand Up @@ -150,6 +152,10 @@ def patched_api_call(original_func, instance, args, kwargs):
kwargs=kwargs,
function_vars=function_vars,
)
elif endpoint_name == "states":
return patched_stepfunction_api_call(
original_func=original_func, instance=instance, args=args, kwargs=kwargs, function_vars=function_vars
)
else:
# this is the default patched api call
return patched_api_call_fallback(
Expand Down Expand Up @@ -220,6 +226,16 @@ def patched_api_call_fallback(original_func, instance, args, kwargs, function_va
cloud_service="sns",
direction=SpanDirection.OUTBOUND,
)
if endpoint_name == "states" and (
operation == "StartExecution" or operation == "StartSyncExecution"
):
inject_trace_to_stepfunction_input(params, span)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="stepfunctions",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

Expand Down
108 changes: 108 additions & 0 deletions ddtrace/contrib/botocore/services/stepfunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import json
from typing import Any # noqa:F401
from typing import Dict # noqa:F401

import botocore.exceptions

from ddtrace import Span # noqa:F401
from ddtrace import config
from ddtrace.ext import http
from ddtrace.propagation.http import HTTPPropagator

from ....ext import SpanTypes
from ....internal.logger import get_logger
from ....internal.schema import SpanDirection
from ....internal.schema import schematize_cloud_messaging_operation
from ....internal.schema import schematize_service_name
from ..utils import set_patched_api_call_span_tags
from ..utils import set_response_metadata_tags


log = get_logger(__name__)


def inject_trace_to_stepfunction_input(params, span):
# type: (Any, Span) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated

Inject the trace headers into the StepFunction input if the input is a JSON string
"""
if "input" not in params:
log.warning("Unable to inject context. The StepFunction input had no input.")
return

if params["input"] is None:
log.warning("Unable to inject context. The StepFunction input was None.")
return

elif isinstance(params["input"], dict):
if "_datadog" in params["input"]:
log.warning("Input already has trace context.")
return
params["input"]["_datadog"] = {}
HTTPPropagator.inject(span.context, params["input"]["_datadog"])
return

elif isinstance(params["input"], str):
try:
input_obj = json.loads(params["input"])
except ValueError:
log.warning("Input is not a valid JSON string")
return

if isinstance(input_obj, dict):
input_obj["_datadog"] = {}
HTTPPropagator.inject(span.context, input_obj["_datadog"])
input_json = json.dumps(input_obj)

params["input"] = input_json
return
else:
log.warning("Unable to inject context. The StepFunction input was not a dict.")
return

else:
log.warning("Unable to inject context. The StepFunction input was not a dict or a JSON string.")


def patched_stepfunction_api_call(original_func, instance, args, kwargs: Dict, function_vars: Dict):
params = function_vars.get("params")
trace_operation = function_vars.get("trace_operation")
pin = function_vars.get("pin")
endpoint_name = function_vars.get("endpoint_name")
operation = function_vars.get("operation")

with pin.tracer.trace(
trace_operation,
service=schematize_service_name("{}.{}".format(pin.service, endpoint_name)),
span_type=SpanTypes.HTTP,
) as span:
set_patched_api_call_span_tags(span, instance, args, params, endpoint_name, operation)

if args:
if config.botocore["distributed_tracing"]:
try:
if endpoint_name == "states" and operation in {"StartExecution", "StartSyncExecution"}:
inject_trace_to_stepfunction_input(params, span)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="stepfunctions",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

try:
return original_func(*args, **kwargs)
except botocore.exceptions.ClientError as e:
set_response_metadata_tags(span, e.response)

# If we have a status code, and the status code is not an error,
# then ignore the exception being raised
status_code = span.get_tag(http.STATUS_CODE)
if status_code and not config.botocore.operations[span.resource].is_error_code(int(status_code)):
span._ignore_exception(botocore.exceptions.ClientError)
raise
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
botocore: Add the ability to inject trace context into the input field of botocore stepfunction start_execution and
start_sync_execution calls.
56 changes: 56 additions & 0 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from moto import mock_s3
from moto import mock_sns
from moto import mock_sqs
from moto import mock_stepfunctions
import pytest

from tests.utils import get_128_bit_trace_id_from_headers
Expand Down Expand Up @@ -885,6 +886,61 @@ def test_schematized_unspecified_service_sqs_client_v1(self):
assert spans[2].service == DEFAULT_SPAN_SERVICE_NAME
assert spans[2].name == "aws.sqs.receive"

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="lincoln",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln", input='{"baz":1}'
)
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln")

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection_with_array_input(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="miller",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller", input='["one", "two", "three"]'
)
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller")

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection_with_true_input(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="hobart",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart", input="true")
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart")

def _test_kinesis_client(self):
client = self.session.create_client("kinesis", region_name="us-east-1")
stream_name = "test"
Expand Down