From d7e270f8fda328df7762c484a2e8981bc95f4cfe Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 13 Feb 2025 19:56:17 -0700 Subject: [PATCH 1/2] Fix service name overrides in consumers --- .../core/datastreams/DataStreamContextExtractor.java | 8 ++++---- .../trace/core/datastreams/DataStreamPropagator.java | 6 +++--- .../core/datastreams/DefaultDataStreamsMonitoring.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java index f831d1cf10d..d438a4eacb9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java @@ -12,14 +12,14 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor { private final TimeSource timeSource; private final Supplier traceConfigSupplier; private final long hashOfKnownTags; - private final String serviceNameOverride; + private final ThreadLocal serviceNameOverride; public DataStreamContextExtractor( HttpCodec.Extractor delegate, TimeSource timeSource, Supplier traceConfigSupplier, long hashOfKnownTags, - String serviceNameOverride) { + ThreadLocal serviceNameOverride) { this.delegate = delegate; this.timeSource = timeSource; this.traceConfigSupplier = traceConfigSupplier; @@ -41,7 +41,7 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett if (shouldExtractPathwayContext) { DefaultPathwayContext pathwayContext = DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get()); extracted.withPathwayContext(pathwayContext); } @@ -50,7 +50,7 @@ public TagContext extract(C carrier, AgentPropagation.ContextVisitor gett } else if (traceConfigSupplier.get().isDataStreamsEnabled()) { DefaultPathwayContext pathwayContext = DefaultPathwayContext.extract( - carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride); + carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get()); if (pathwayContext != null) { extracted = new TagContext(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java index 7fdedfad381..8807066cb0b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java @@ -20,13 +20,13 @@ public class DataStreamPropagator implements Propagator { private final Supplier traceConfigSupplier; private final TimeSource timeSource; private final long hashOfKnownTags; - private final String serviceNameOverride; + private final ThreadLocal serviceNameOverride; public DataStreamPropagator( Supplier traceConfigSupplier, TimeSource timeSource, long hashOfKnownTags, - String serviceNameOverride) { + ThreadLocal serviceNameOverride) { this.traceConfigSupplier = traceConfigSupplier; this.timeSource = timeSource; this.hashOfKnownTags = hashOfKnownTags; @@ -78,6 +78,6 @@ private boolean isDsmEnabled(@Nullable TagContext tagContext) { private PathwayContext extractDsmPathwayContext(C carrier, CarrierVisitor visitor) { return DefaultPathwayContext.extract( - carrier, visitor, this.timeSource, this.hashOfKnownTags, this.serviceNameOverride); + carrier, visitor, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get()); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 2c8a897e762..deaa925c6b0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -204,13 +204,13 @@ public PathwayContext newPathwayContext() { @Override public Propagator propagator() { return new DataStreamPropagator( - this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName()); + this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, serviceNameOverride); } @Override public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) { return new DataStreamContextExtractor( - delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName()); + delegate, timeSource, traceConfigSupplier, hashOfKnownTags, serviceNameOverride); } @Override From 0b0527fc85d8b1b2d64b16ffa92483b59f583a5d Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 18 Feb 2025 15:02:46 -0700 Subject: [PATCH 2/2] Add test for sink connector to test service name overrides --- .../ConnectWorkerInstrumentationTest.groovy | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy index 48c69d7d115..244a5213ff2 100644 --- a/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy @@ -6,6 +6,9 @@ import org.apache.kafka.clients.admin.DescribeClusterResult import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.utils.Time import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy @@ -176,6 +179,137 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner { tempFile?.delete() } + def "test kafka-connect sink instrumentation"() { + String bootstrapServers = embeddedKafka.getBrokersAsString() + + Properties adminProps = new Properties() + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + String clusterId = null + try (AdminClient adminClient = AdminClient.create(adminProps)) { + DescribeClusterResult describeClusterResult = adminClient.describeCluster() + clusterId = describeClusterResult.clusterId().get() + } + assert clusterId != null : "Cluster ID is null" + + // Create a temporary file where the sink connector should write + File sinkFile = File.createTempFile("sink-messages", ".txt") + if (sinkFile.exists()) { + sinkFile.delete() + } + sinkFile.deleteOnExit() + + Properties workerProps = new Properties() + workerProps.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter") + workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter") + workerProps.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/connect.offsets") + workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter") + workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter") + workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "") // Required but can be empty for built-in connectors + workerProps.put("plugin.scan.classpath", "true") + + Map workerPropsMap = workerProps.stringPropertyNames() + .collectEntries { [(it): workerProps.getProperty(it)] } + + // Create the Connect worker + Time time = Time.SYSTEM + Plugins plugins = new Plugins(workerPropsMap) + plugins.compareAndSwapWithDelegatingLoader() + String workerId = "worker-1" + + FileOffsetBackingStore offsetBackingStore = new FileOffsetBackingStore() + WorkerConfig workerConfig = new StandaloneConfig(workerPropsMap) + offsetBackingStore.configure(workerConfig) + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy() + Worker worker = new Worker(workerId, time, plugins, workerConfig, offsetBackingStore, connectorClientConfigOverridePolicy) + Herder herder = new StandaloneHerder(worker, clusterId, connectorClientConfigOverridePolicy) + + // Start worker and herder + worker.start() + herder.start() + + // Create the sink connector configuration + Map connectorProps = [ + 'name' : 'file-sink-connector', + 'connector.class': 'org.apache.kafka.connect.file.FileStreamSinkConnector', + 'tasks.max' : '1', + 'file' : sinkFile.getAbsolutePath(), + 'topics' : 'test-topic' + ] + + // Latch to wait for connector addition + CountDownLatch connectorAddedLatch = new CountDownLatch(1) + Callback> addConnectorCallback = new Callback>() { + @Override + void onCompletion(Throwable error, Herder.Created result) { + if (error != null) { + error.printStackTrace() + } else { + println "Sink connector added successfully." + } + connectorAddedLatch.countDown() + } + } + + when: + // Add the sink connector to the herder + herder.putConnectorConfig("file-sink-connector", connectorProps, false, addConnectorCallback) + + // Wait for the connector to be added + boolean connectorAdded = connectorAddedLatch.await(10, TimeUnit.SECONDS) + assert connectorAdded : "Sink connector was not added in time" + + // Produce a message to the topic that we expect to be written to the file + Properties producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + + KafkaProducer producer = new KafkaProducer<>(producerProps) + producer.send(new ProducerRecord<>("test-topic", "key1", "Hello Kafka Sink")) + producer.flush() + producer.close() + + for (int i = 0; i < 100; i++) { // Try for up to 10 seconds + Thread.sleep(100) + if (sinkFile.text.contains("Hello Kafka Sink")) { + break + } + } + + String fileContents = sinkFile.text + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + + then: + fileContents.contains("Hello Kafka Sink") + + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + assert [ + "direction:out", + "topic:test-topic", + "type:kafka" + ].every( tag -> edgeTags.contains(tag) ) + } + + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + assert [ + "direction:in", + "group:connect-file-sink-connector", + "topic:test-topic", + "type:kafka" + ].every( tag -> edgeTags.contains(tag) ) + } + TEST_DATA_STREAMS_WRITER.getServices().contains('file-sink-connector') + + + cleanup: + herder?.stop() + worker?.stop() + sinkFile?.delete() + } + @Override protected boolean isDataStreamsEnabled() { return true