diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcCodeOriginTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcCodeOriginTest.groovy deleted file mode 100644 index 7b207f5f2f9..00000000000 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcCodeOriginTest.groovy +++ /dev/null @@ -1,199 +0,0 @@ -import com.google.common.util.concurrent.MoreExecutors -import datadog.trace.agent.test.naming.VersionedNamingTestBase -import datadog.trace.bootstrap.debugger.DebuggerContext -import example.GreeterGrpc -import example.Helloworld -import io.grpc.BindableService -import io.grpc.ManagedChannel -import io.grpc.Server -import io.grpc.inprocess.InProcessChannelBuilder -import io.grpc.inprocess.InProcessServerBuilder -import io.grpc.stub.StreamObserver - -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference - -import static datadog.trace.agent.test.asserts.TagsAssert.assertTags - - -abstract class GrpcCodeOriginTest extends VersionedNamingTestBase { - @Override - final String service() { - return null - } - - @Override - final String operation() { - return null - } - - protected abstract String clientOperation() - - protected abstract String serverOperation() - - protected boolean hasClientMessageSpans() { - false - } - - @Override - protected void configurePreAgent() { - super.configurePreAgent() - injectSysConfig("dd.trace.grpc.ignored.inbound.methods", "example.Greeter/IgnoreInbound") - injectSysConfig("dd.trace.grpc.ignored.outbound.methods", "example.Greeter/Ignore") - if (hasClientMessageSpans()) { - injectSysConfig("integration.grpc-message.enabled", "true") - } - // here to trigger wrapping to record scheduling time - the logic is trivial so it's enough to verify - // that ClassCastExceptions do not arise from the wrapping - injectSysConfig("dd.profiling.enabled", "true") - codeOriginSetup() - } - - def "code origin test #name"() { - setup: - - def msgCount = serverMessageCount - def serverReceived = new CopyOnWriteArrayList<>() - def clientReceived = new CopyOnWriteArrayList<>() - def error = new AtomicReference() - - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - StreamObserver conversation(StreamObserver observer) { - return new StreamObserver() { - @Override - void onNext(Helloworld.Response value) { - - serverReceived << value.message - - (1..msgCount).each { - if (TEST_TRACER.isAsyncPropagationEnabled()) { - observer.onNext(value) - } else { - observer.onError(new IllegalStateException("not async propagating!")) - } - } - } - - @Override - void onError(Throwable t) { - if (TEST_TRACER.isAsyncPropagationEnabled()) { - error.set(t) - observer.onError(t) - } else { - observer.onError(new IllegalStateException("not async propagating!")) - } - } - - @Override - void onCompleted() { - if (TEST_TRACER.isAsyncPropagationEnabled()) { - observer.onCompleted() - } else { - observer.onError(new IllegalStateException("not async propagating!")) - } - } - } - } - } - Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter) - .executor(directExecutor ? MoreExecutors.directExecutor() : Executors.newCachedThreadPool()) - .build().start() - - ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() - GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady() - - when: - def streamObserver = client.conversation(new StreamObserver() { - @Override - void onNext(Helloworld.Response value) { - if (TEST_TRACER.isAsyncPropagationEnabled()) { - clientReceived << value.message - } else { - error.set(new IllegalStateException("not async propagating!")) - } - } - - @Override - void onError(Throwable t) { - if (TEST_TRACER.isAsyncPropagationEnabled()) { - error.set(t) - } else { - error.set(new IllegalStateException("not async propagating!")) - } - } - - @Override - void onCompleted() { - if (!TEST_TRACER.isAsyncPropagationEnabled()) { - error.set(new IllegalStateException("not async propagating!")) - } - } - }) - - clientRange.each { - def message = Helloworld.Response.newBuilder().setMessage("call $it").build() - streamObserver.onNext(message) - } - streamObserver.onCompleted() - - then: - error.get() == null - TEST_WRITER.waitForTraces(2) - error.get() == null - serverReceived == clientRange.collect { "call $it" } - clientReceived == serverRange.collect { - clientRange.collect { - "call $it" - } - }.flatten().sort() - - assert DebuggerContext.codeOriginRecorder != null - def span = TEST_WRITER.flatten().find { - it.operationName.toString() == "grpc.server.request" - } - assertTags(span, { - it.codeOriginTags() - }, false) - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - name | clientMessageCount | serverMessageCount | directExecutor - "A" | 1 | 1 | false - "B" | 2 | 1 | false - "C" | 1 | 2 | false - "D" | 2 | 2 | false - "E" | 3 | 3 | false - "A" | 1 | 1 | true - "B" | 2 | 1 | true - "C" | 1 | 2 | true - "D" | 2 | 2 | true - "E" | 3 | 3 | true - - clientRange = 1..clientMessageCount - serverRange = 1..serverMessageCount - } -} - -class GrpcCodeOriginForkedTest extends GrpcCodeOriginTest { - - @Override - int version() { - return 1 - } - - @Override - protected String clientOperation() { - return "grpc.client.request" - } - - @Override - protected String serverOperation() { - return "grpc.server.request" - } -} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy index fa98f4ad5b2..0b4d174b4dc 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -1,3 +1,4 @@ +import static datadog.trace.agent.test.asserts.TagsAssert.codeOriginTags import static datadog.trace.api.config.TraceInstrumentationConfig.GRPC_SERVER_ERROR_STATUSES import com.google.common.util.concurrent.MoreExecutors @@ -150,6 +151,7 @@ abstract class GrpcTest extends VersionedNamingTestBase { then: response.message == "Hello $name" + codeOriginTags(TEST_WRITER) assertTraces(2) { trace(hasClientMessageSpans() ? 3 : 2) { basicSpan(it, "parent") @@ -680,6 +682,12 @@ class GrpcDataStreamsEnabledV0Test extends GrpcDataStreamsEnabledForkedTest { class GrpcDataStreamsEnabledV1ForkedTest extends GrpcDataStreamsEnabledForkedTest { + @Override + protected void configurePreAgent() { + super.configurePreAgent() + codeOriginSetup() + } + @Override int version() { return 1 diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 60f5be62322..9e71a218b7a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -32,6 +32,9 @@ import org.springframework.kafka.test.utils.KafkaTestUtils import java.util.concurrent.ExecutionException import java.util.concurrent.Future + +import static datadog.trace.agent.test.asserts.TagsAssert.codeOriginTags + import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -227,8 +230,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { int nTraces = isDataStreamsEnabled() ? 3 : 2 int produceTraceIdx = nTraces - 1 TEST_WRITER.waitForTraces(nTraces) - def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) - Collections.sort(traces, new SortKafkaTraces()) + def traces = new ArrayList<>(TEST_WRITER) + traces.sort(new SortKafkaTraces()) + codeOriginTags(TEST_WRITER) assertTraces(nTraces, new SortKafkaTraces()) { if (hasQueueSpan()) { trace(2) { @@ -1040,6 +1044,12 @@ class KafkaClientV0ForkedTest extends KafkaClientForkedTest { } class KafkaClientV1ForkedTest extends KafkaClientForkedTest { + @Override + void configurePreAgent() { + super.configurePreAgent() + codeOriginSetup() + } + @Override int version() { 1 diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaCodeOriginForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaCodeOriginForkedTest.groovy deleted file mode 100644 index 4f2c9414c6a..00000000000 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaCodeOriginForkedTest.groovy +++ /dev/null @@ -1,189 +0,0 @@ -import datadog.trace.agent.test.naming.VersionedNamingTestBase -import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags -import datadog.trace.common.writer.ListWriter -import datadog.trace.core.DDSpan -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.StringSerializer -import org.junit.Rule -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.listener.ContainerProperties -import org.springframework.kafka.listener.KafkaMessageListenerContainer -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.EmbeddedKafkaBroker -import org.springframework.kafka.test.rule.EmbeddedKafkaRule -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.kafka.test.utils.KafkaTestUtils - -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit - -import static datadog.trace.agent.test.asserts.TagsAssert.assertTags -import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled - -class KafkaCodeOriginForkedTest extends VersionedNamingTestBase { - static final SHARED_TOPIC = "shared.topic" - static final String MESSAGE = "Testing without headers for certain topics" - - @Rule - EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, SHARED_TOPIC) - EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka - - @Override - boolean useStrictTraceWrites() { - // TODO fix this by making sure that spans get closed properly - return false - } - - @Override - void configurePreAgent() { - super.configurePreAgent() - - injectSysConfig("dd.kafka.e2e.duration.enabled", "false") - injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") - injectSysConfig("dd.service", "KafkaClientTest") - codeOriginSetup() - } - - // filter out Kafka poll, since the function is called in a loop, giving inconsistent results - final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { - @Override - boolean accept(List trace) { - return !(trace.size() == 1 && - trace.get(0).getResourceName().toString().equals("kafka.poll")) - } - } - - final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() { - @Override - boolean accept(List trace) { - return !(trace.size() == 1 && - trace.get(0).getResourceName().toString().equals("kafka.poll") && - trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0)) - } - } - - // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results - private static class SortKafkaTraces implements Comparator> { - @Override - int compare(List o1, List o2) { - return rootSpanTrace(o1) - rootSpanTrace(o2) - } - - int rootSpanTrace(List trace) { - assert !trace.isEmpty() - def rootSpan = trace.get(0).localRootSpan - switch (rootSpan.operationName.toString()) { - case "parent": - return 3 - case "kafka.poll": - return 2 - default: - return 1 - } - } - } - - def setup() { - TEST_WRITER.setFilter(dropKafkaPoll) - } - - @Override - int version() { - 1 - } - - @Override - String operation() { - return null - } - - String operationForProducer() { - "kafka.send" - } - - String operationForConsumer() { - return "kafka.process" - } - - String serviceForTimeInQueue() { - "kafka-queue" - } - - def "test with code origin"() { - setup: - // Create and start a Kafka container using Testcontainers - - def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - if (isDataStreamsEnabled()) { - senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) - } - TEST_WRITER.setFilter(dropEmptyKafkaPoll) - KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) - // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") - // create a Kafka consumer factory - def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) - // set the topic that needs to be consumed - def containerProperties = new ContainerProperties(SHARED_TOPIC) - // create a Kafka MessageListenerContainer - def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) - // create a thread safe queue to store the received message - def records = new LinkedBlockingQueue>() - // setup a Kafka message listener - container.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) - // ensure consistent ordering of traces - records.add(record) - } - }) - // start the container and underlying message listener - container.start() - // wait until the container has the required number of assigned partitions - ContainerTestUtils.waitForAssignment(container, container.assignedPartitions.size()) - when: - String greeting = "Hello Spring Kafka Sender!" - runUnderTrace("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC,greeting)) { meta, ex -> - assert isAsyncPropagationEnabled() - if (ex == null) { - runUnderTrace("producer callback") {} - } else { - runUnderTrace("producer exception: " + ex) {} - } - } - blockUntilChildSpansFinished(2) - } - - then: - // // check that the message was received - def received = records.poll(10, TimeUnit.SECONDS) - received.value() == greeting - received.key() == null - int nTraces = 2 - TEST_WRITER.waitForTraces(nTraces) - def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) - Collections.sort(traces, new SortKafkaTraces()) - assertTags(traces[0][0], { - it.codeOriginTags() - }, false) - - cleanup: - producer.close() - container?.stop() - } - - @Override - String service() { - return "KafkaClientTest" - } -} - - - - diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy index 940ca35673a..a484826e84a 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy @@ -304,6 +304,7 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L ((Logger) LoggerFactory.getLogger("org.testcontainers")).setLevel(Level.DEBUG) } + def codeOriginSetup() { injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "true", true) injectSysConfig(DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE, "false", true) @@ -313,7 +314,6 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L .setService("code origin test") .build() - def config = Config.get() def probeStatusSink = new ProbeStatusSink(config, "http://datadoghq.com", false) @@ -531,6 +531,11 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L ActiveSubsystems.APPSEC_ACTIVE = originalAppSecRuntimeValue + if (Config.get().isDebuggerCodeOriginEnabled()) { + injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "false", true) + rebuildConfig() + } + try { if (enabledFinishTimingChecks()) { doCheckRepeatedFinish() diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy index 47d959771e5..8db86d22f01 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy @@ -6,6 +6,7 @@ import datadog.trace.api.DDTags import datadog.trace.api.naming.SpanNaming import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.common.sampling.RateByServiceTraceSampler +import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ddagent.TraceMapper import datadog.trace.core.DDSpan import groovy.transform.stc.ClosureParams @@ -89,6 +90,9 @@ class TagsAssert { assertedTags.add(DDTags.DJM_ENABLED) assertedTags.add(DDTags.PARENT_ID) assertedTags.add(DDTags.SPAN_LINKS) // this is checked by LinksAsserter + DDTags.REQUIRED_CODE_ORIGIN_TAGS.each { + assertedTags.add(it) + } assert tags["thread.name"] != null assert tags["thread.id"] != null @@ -127,9 +131,26 @@ class TagsAssert { } } - def codeOriginTags() { - DDTags.REQUIRED_CODE_ORIGIN_TAGS.each { - assert tags[it] != null + static void codeOriginTags(ListWriter writer) { + if (Config.get().isDebuggerCodeOriginEnabled()) { + def traces = new ArrayList<>(writer) //as List> + + def spans = [] + traces.each { + it.each { + if (it.tags[DDTags.DD_CODE_ORIGIN_TYPE] != null) { + spans += it + } + } + } + assert !spans.isEmpty(): "Should have found at least one span with code origin" + spans.each { + assertTags(it, { + DDTags.REQUIRED_CODE_ORIGIN_TAGS.each { + assert tags[it] != null: "Should have found ${it} in span tags: " + tags.keySet() + } + }, false) + } } }