@@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
44
44
// Set a service name that gets sorted early with SORT_BY_NAMES
45
45
injectSysConfig(GeneralConfig . SERVICE_NAME , " A-service" )
46
46
injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , isDataStreamsEnabled(). toString())
47
+ injectSysConfig(" sqs.body.propagation.enabled" , " true" )
47
48
}
48
49
49
50
@Shared
@@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest {
511
512
}
512
513
513
514
class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
515
+ private static final String MESSAGE_WITH_ATTRIBUTES = " {\n " +
516
+ " \" Type\" : \" Notification\" ,\n " +
517
+ " \" MessageId\" : \" cb337e2a-1c06-5629-86f5-21fba14fb492\" ,\n " +
518
+ " \" TopicArn\" : \" arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\" ,\n " +
519
+ " \" Message\" : \" Some message\" ,\n " +
520
+ " \" Timestamp\" : \" 2024-12-10T03:52:41.662Z\" ,\n " +
521
+ " \" SignatureVersion\" : \" 1\" ,\n " +
522
+ " \" Signature\" : \" ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\" ,\n " +
523
+ " \" SigningCertURL\" : \" https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\" ,\n " +
524
+ " \" UnsubscribeURL\" : \" https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\" ,\n " +
525
+ " \" MessageAttributes\" : {\n " +
526
+ " \" _datadog\" : {\" Type\" :\" Binary\" ,\" Value\" :\" eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\" }\n " +
527
+ " }\n " +
528
+ " }"
529
+
530
+
514
531
@Override
515
532
String expectedOperation (String awsService , String awsOperation ) {
516
533
if (awsService == " SQS" ) {
@@ -537,6 +554,37 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
537
554
int version () {
538
555
1
539
556
}
557
+
558
+ def " Data streams context extracted from message body" () {
559
+ setup :
560
+ def client = AmazonSQSClientBuilder . standard()
561
+ .withEndpointConfiguration(endpoint)
562
+ .withCredentials(credentialsProvider)
563
+ .build()
564
+ def queueUrl = client. createQueue(' somequeue' ). queueUrl
565
+ TEST_WRITER . clear()
566
+
567
+ when :
568
+ injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , " false" )
569
+ client. sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES )
570
+ injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , " true" )
571
+ def messages = client. receiveMessage(queueUrl). messages
572
+ messages. forEach {/* consume to create message spans */ }
573
+
574
+ TEST_DATA_STREAMS_WRITER . waitForGroups(1 )
575
+
576
+ then :
577
+ StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == -2734507826469073289 }
578
+
579
+ verifyAll(first) {
580
+ edgeTags == [" direction:in" , " topic:somequeue" , " type:sqs" ]
581
+ edgeTags. size() == 3
582
+ }
583
+
584
+ cleanup :
585
+ client. shutdown()
586
+ }
587
+
540
588
}
541
589
542
590
0 commit comments