Skip to content

Commit e6c6eaf

Browse files
Fix service name overrides in consumers (#8387)
* Fix service name overrides in consumers
1 parent 27b2d68 commit e6c6eaf

File tree

4 files changed

+143
-9
lines changed

4 files changed

+143
-9
lines changed

dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import org.apache.kafka.clients.admin.DescribeClusterResult
66
import org.apache.kafka.clients.consumer.ConsumerConfig
77
import org.apache.kafka.clients.consumer.ConsumerRecords
88
import org.apache.kafka.clients.consumer.KafkaConsumer
9+
import org.apache.kafka.clients.producer.KafkaProducer
10+
import org.apache.kafka.clients.producer.ProducerConfig
11+
import org.apache.kafka.clients.producer.ProducerRecord
912
import org.apache.kafka.common.utils.Time
1013
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
1114
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -176,6 +179,137 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {
176179
tempFile?.delete()
177180
}
178181

182+
def "test kafka-connect sink instrumentation"() {
183+
String bootstrapServers = embeddedKafka.getBrokersAsString()
184+
185+
Properties adminProps = new Properties()
186+
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
187+
String clusterId = null
188+
try (AdminClient adminClient = AdminClient.create(adminProps)) {
189+
DescribeClusterResult describeClusterResult = adminClient.describeCluster()
190+
clusterId = describeClusterResult.clusterId().get()
191+
}
192+
assert clusterId != null : "Cluster ID is null"
193+
194+
// Create a temporary file where the sink connector should write
195+
File sinkFile = File.createTempFile("sink-messages", ".txt")
196+
if (sinkFile.exists()) {
197+
sinkFile.delete()
198+
}
199+
sinkFile.deleteOnExit()
200+
201+
Properties workerProps = new Properties()
202+
workerProps.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
203+
workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
204+
workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
205+
workerProps.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/connect.offsets")
206+
workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
207+
workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
208+
workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "") // Required but can be empty for built-in connectors
209+
workerProps.put("plugin.scan.classpath", "true")
210+
211+
Map<String, String> workerPropsMap = workerProps.stringPropertyNames()
212+
.collectEntries { [(it): workerProps.getProperty(it)] }
213+
214+
// Create the Connect worker
215+
Time time = Time.SYSTEM
216+
Plugins plugins = new Plugins(workerPropsMap)
217+
plugins.compareAndSwapWithDelegatingLoader()
218+
String workerId = "worker-1"
219+
220+
FileOffsetBackingStore offsetBackingStore = new FileOffsetBackingStore()
221+
WorkerConfig workerConfig = new StandaloneConfig(workerPropsMap)
222+
offsetBackingStore.configure(workerConfig)
223+
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy()
224+
Worker worker = new Worker(workerId, time, plugins, workerConfig, offsetBackingStore, connectorClientConfigOverridePolicy)
225+
Herder herder = new StandaloneHerder(worker, clusterId, connectorClientConfigOverridePolicy)
226+
227+
// Start worker and herder
228+
worker.start()
229+
herder.start()
230+
231+
// Create the sink connector configuration
232+
Map<String, String> connectorProps = [
233+
'name' : 'file-sink-connector',
234+
'connector.class': 'org.apache.kafka.connect.file.FileStreamSinkConnector',
235+
'tasks.max' : '1',
236+
'file' : sinkFile.getAbsolutePath(),
237+
'topics' : 'test-topic'
238+
]
239+
240+
// Latch to wait for connector addition
241+
CountDownLatch connectorAddedLatch = new CountDownLatch(1)
242+
Callback<Herder.Created<ConnectorInfo>> addConnectorCallback = new Callback<Herder.Created<ConnectorInfo>>() {
243+
@Override
244+
void onCompletion(Throwable error, Herder.Created<ConnectorInfo> result) {
245+
if (error != null) {
246+
error.printStackTrace()
247+
} else {
248+
println "Sink connector added successfully."
249+
}
250+
connectorAddedLatch.countDown()
251+
}
252+
}
253+
254+
when:
255+
// Add the sink connector to the herder
256+
herder.putConnectorConfig("file-sink-connector", connectorProps, false, addConnectorCallback)
257+
258+
// Wait for the connector to be added
259+
boolean connectorAdded = connectorAddedLatch.await(10, TimeUnit.SECONDS)
260+
assert connectorAdded : "Sink connector was not added in time"
261+
262+
// Produce a message to the topic that we expect to be written to the file
263+
Properties producerProps = new Properties()
264+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
265+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
266+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
267+
268+
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)
269+
producer.send(new ProducerRecord<>("test-topic", "key1", "Hello Kafka Sink"))
270+
producer.flush()
271+
producer.close()
272+
273+
for (int i = 0; i < 100; i++) { // Try for up to 10 seconds
274+
Thread.sleep(100)
275+
if (sinkFile.text.contains("Hello Kafka Sink")) {
276+
break
277+
}
278+
}
279+
280+
String fileContents = sinkFile.text
281+
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
282+
283+
then:
284+
fileContents.contains("Hello Kafka Sink")
285+
286+
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
287+
verifyAll(first) {
288+
assert [
289+
"direction:out",
290+
"topic:test-topic",
291+
"type:kafka"
292+
].every( tag -> edgeTags.contains(tag) )
293+
}
294+
295+
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
296+
verifyAll(second) {
297+
assert [
298+
"direction:in",
299+
"group:connect-file-sink-connector",
300+
"topic:test-topic",
301+
"type:kafka"
302+
].every( tag -> edgeTags.contains(tag) )
303+
}
304+
TEST_DATA_STREAMS_WRITER.getServices().contains('file-sink-connector')
305+
306+
307+
cleanup:
308+
herder?.stop()
309+
worker?.stop()
310+
sinkFile?.delete()
311+
}
312+
179313
@Override
180314
protected boolean isDataStreamsEnabled() {
181315
return true

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor {
1212
private final TimeSource timeSource;
1313
private final Supplier<TraceConfig> traceConfigSupplier;
1414
private final long hashOfKnownTags;
15-
private final String serviceNameOverride;
15+
private final ThreadLocal<String> serviceNameOverride;
1616

1717
public DataStreamContextExtractor(
1818
HttpCodec.Extractor delegate,
1919
TimeSource timeSource,
2020
Supplier<TraceConfig> traceConfigSupplier,
2121
long hashOfKnownTags,
22-
String serviceNameOverride) {
22+
ThreadLocal<String> serviceNameOverride) {
2323
this.delegate = delegate;
2424
this.timeSource = timeSource;
2525
this.traceConfigSupplier = traceConfigSupplier;
@@ -41,7 +41,7 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett
4141
if (shouldExtractPathwayContext) {
4242
DefaultPathwayContext pathwayContext =
4343
DefaultPathwayContext.extract(
44-
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
44+
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());
4545

4646
extracted.withPathwayContext(pathwayContext);
4747
}
@@ -50,7 +50,7 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett
5050
} else if (traceConfigSupplier.get().isDataStreamsEnabled()) {
5151
DefaultPathwayContext pathwayContext =
5252
DefaultPathwayContext.extract(
53-
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
53+
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());
5454

5555
if (pathwayContext != null) {
5656
extracted = new TagContext();

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ public class DataStreamPropagator implements Propagator {
2020
private final Supplier<TraceConfig> traceConfigSupplier;
2121
private final TimeSource timeSource;
2222
private final long hashOfKnownTags;
23-
private final String serviceNameOverride;
23+
private final ThreadLocal<String> serviceNameOverride;
2424

2525
public DataStreamPropagator(
2626
Supplier<TraceConfig> traceConfigSupplier,
2727
TimeSource timeSource,
2828
long hashOfKnownTags,
29-
String serviceNameOverride) {
29+
ThreadLocal<String> serviceNameOverride) {
3030
this.traceConfigSupplier = traceConfigSupplier;
3131
this.timeSource = timeSource;
3232
this.hashOfKnownTags = hashOfKnownTags;
@@ -78,6 +78,6 @@ private boolean isDsmEnabled(@Nullable TagContext tagContext) {
7878

7979
private <C> PathwayContext extractDsmPathwayContext(C carrier, CarrierVisitor<C> visitor) {
8080
return DefaultPathwayContext.extract(
81-
carrier, visitor, this.timeSource, this.hashOfKnownTags, this.serviceNameOverride);
81+
carrier, visitor, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());
8282
}
8383
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,13 @@ public PathwayContext newPathwayContext() {
204204
@Override
205205
public Propagator propagator() {
206206
return new DataStreamPropagator(
207-
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName());
207+
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
208208
}
209209

210210
@Override
211211
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
212212
return new DataStreamContextExtractor(
213-
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName());
213+
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, serviceNameOverride);
214214
}
215215

216216
@Override

0 commit comments

Comments
 (0)