@@ -44,7 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
44
44
// Set a service name that gets sorted early with SORT_BY_NAMES
45
45
injectSysConfig(GeneralConfig . SERVICE_NAME , " A-service" )
46
46
injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , isDataStreamsEnabled(). toString())
47
- injectSysConfig(" sqs.body.propagation.enabled" , " true" )
47
+ injectSysConfig(" trace. sqs.body.propagation.enabled" , " true" )
48
48
}
49
49
50
50
@Shared
@@ -585,6 +585,35 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
585
585
client. shutdown()
586
586
}
587
587
588
+ def " Data streams context not extracted from message body when message attributes are not present" () {
589
+ setup :
590
+ def client = AmazonSQSClientBuilder . standard()
591
+ .withEndpointConfiguration(endpoint)
592
+ .withCredentials(credentialsProvider)
593
+ .build()
594
+ def queueUrl = client. createQueue(' somequeue' ). queueUrl
595
+ TEST_WRITER . clear()
596
+
597
+ when :
598
+ injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , " false" )
599
+ client. sendMessage(queueUrl, ' {"Message": "sometext"}' )
600
+ injectSysConfig(GeneralConfig . DATA_STREAMS_ENABLED , " true" )
601
+ def messages = client. receiveMessage(queueUrl). messages
602
+ messages. forEach {}
603
+
604
+ TEST_DATA_STREAMS_WRITER . waitForGroups(1 )
605
+
606
+ then :
607
+ StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
608
+
609
+ verifyAll(first) {
610
+ edgeTags == [" direction:in" , " topic:somequeue" , " type:sqs" ]
611
+ edgeTags. size() == 3
612
+ }
613
+
614
+ cleanup :
615
+ client. shutdown()
616
+ }
588
617
}
589
618
590
619
0 commit comments