Skip to content

Commit c36be03

Browse files
committed
Fix KafkaStreamsTest
1 parent 4f278de commit c36be03

File tree

1 file changed

+24
-7
lines changed

1 file changed

+24
-7
lines changed

dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import spock.lang.Shared
2727

2828
import java.util.concurrent.LinkedBlockingQueue
2929
import java.util.concurrent.TimeUnit
30+
import java.util.regex.Pattern
3031

3132
@Flaky("https://github.com/DataDog/dd-trace-java/issues/3865")
3233
class KafkaStreamsTest extends AgentTestRunner {
@@ -39,6 +40,11 @@ class KafkaStreamsTest extends AgentTestRunner {
3940
@Shared
4041
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka
4142

43+
def setup() {
44+
// Filter out additional traces for kafka.poll operation, otherwise, there will be more traces than expected.
45+
TEST_WRITER.setFilter { trace -> trace[0].operationName.toString() != 'kafka.poll' }
46+
}
47+
4248
@Override
4349
protected boolean isDataStreamsEnabled() {
4450
return true
@@ -135,6 +141,7 @@ class KafkaStreamsTest extends AgentTestRunner {
135141
if ({ isDataStreamsEnabled()}) {
136142
"$DDTags.PATHWAY_HASH" { String }
137143
}
144+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
138145
defaultTagsNoPeerService()
139146
}
140147
}
@@ -159,7 +166,6 @@ class KafkaStreamsTest extends AgentTestRunner {
159166
"$InstrumentationTags.PARTITION" { it >= 0 }
160167
"$InstrumentationTags.OFFSET" 0
161168
"$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000"
162-
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
163169
"asdf" "testing"
164170
if ({ isDataStreamsEnabled()}) {
165171
"$DDTags.PATHWAY_HASH" { String }
@@ -186,6 +192,7 @@ class KafkaStreamsTest extends AgentTestRunner {
186192
if ({ isDataStreamsEnabled()}) {
187193
"$DDTags.PATHWAY_HASH" { String }
188194
}
195+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
189196
defaultTagsNoPeerService()
190197
}
191198
}
@@ -212,6 +219,7 @@ class KafkaStreamsTest extends AgentTestRunner {
212219
if ({ isDataStreamsEnabled()}) {
213220
"$DDTags.PATHWAY_HASH" { String }
214221
}
222+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
215223
defaultTags(true)
216224
}
217225
}
@@ -226,8 +234,11 @@ class KafkaStreamsTest extends AgentTestRunner {
226234
if (isDataStreamsEnabled()) {
227235
StatsGroup originProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
228236
verifyAll(originProducerPoint) {
229-
edgeTags == ["direction:out", "topic:$STREAM_PENDING", "type:kafka"]
230-
edgeTags.size() == 3
237+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
238+
for (String tag : ["direction:out", "topic:$STREAM_PENDING", "type:kafka"]) {
239+
assert edgeTags.contains(tag)
240+
}
241+
edgeTags.size() == 4
231242
}
232243

233244
StatsGroup kafkaStreamsConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == originProducerPoint.hash }
@@ -243,14 +254,20 @@ class KafkaStreamsTest extends AgentTestRunner {
243254

244255
StatsGroup kafkaStreamsProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsConsumerPoint.hash }
245256
verifyAll(kafkaStreamsProducerPoint) {
246-
edgeTags == ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"]
247-
edgeTags.size() == 3
257+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
258+
for (String tag : ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"]) {
259+
assert edgeTags.contains(tag)
260+
}
261+
edgeTags.size() == 4
248262
}
249263

250264
StatsGroup finalConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsProducerPoint.hash }
251265
verifyAll(finalConsumerPoint) {
252-
edgeTags == ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"]
253-
edgeTags.size() == 4
266+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
267+
for (String tag : ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"]) {
268+
assert edgeTags.contains(tag)
269+
}
270+
edgeTags.size() == 5
254271
}
255272
}
256273

0 commit comments

Comments
 (0)