Skip to content

Fix service name overrides in consumers #8387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import org.apache.kafka.clients.admin.DescribeClusterResult
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.utils.Time
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
Expand Down Expand Up @@ -176,6 +179,137 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {
tempFile?.delete()
}

def "test kafka-connect sink instrumentation"() {
String bootstrapServers = embeddedKafka.getBrokersAsString()

Properties adminProps = new Properties()
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
String clusterId = null
try (AdminClient adminClient = AdminClient.create(adminProps)) {
DescribeClusterResult describeClusterResult = adminClient.describeCluster()
clusterId = describeClusterResult.clusterId().get()
}
assert clusterId != null : "Cluster ID is null"

// Create a temporary file where the sink connector should write
File sinkFile = File.createTempFile("sink-messages", ".txt")
if (sinkFile.exists()) {
sinkFile.delete()
}
sinkFile.deleteOnExit()

Properties workerProps = new Properties()
workerProps.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter")
workerProps.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/connect.offsets")
workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter")
workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "") // Required but can be empty for built-in connectors
workerProps.put("plugin.scan.classpath", "true")

Map<String, String> workerPropsMap = workerProps.stringPropertyNames()
.collectEntries { [(it): workerProps.getProperty(it)] }

// Create the Connect worker
Time time = Time.SYSTEM
Plugins plugins = new Plugins(workerPropsMap)
plugins.compareAndSwapWithDelegatingLoader()
String workerId = "worker-1"

FileOffsetBackingStore offsetBackingStore = new FileOffsetBackingStore()
WorkerConfig workerConfig = new StandaloneConfig(workerPropsMap)
offsetBackingStore.configure(workerConfig)
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy()
Worker worker = new Worker(workerId, time, plugins, workerConfig, offsetBackingStore, connectorClientConfigOverridePolicy)
Herder herder = new StandaloneHerder(worker, clusterId, connectorClientConfigOverridePolicy)

// Start worker and herder
worker.start()
herder.start()

// Create the sink connector configuration
Map<String, String> connectorProps = [
'name' : 'file-sink-connector',
'connector.class': 'org.apache.kafka.connect.file.FileStreamSinkConnector',
'tasks.max' : '1',
'file' : sinkFile.getAbsolutePath(),
'topics' : 'test-topic'
]

// Latch to wait for connector addition
CountDownLatch connectorAddedLatch = new CountDownLatch(1)
Callback<Herder.Created<ConnectorInfo>> addConnectorCallback = new Callback<Herder.Created<ConnectorInfo>>() {
@Override
void onCompletion(Throwable error, Herder.Created<ConnectorInfo> result) {
if (error != null) {
error.printStackTrace()
} else {
println "Sink connector added successfully."
}
connectorAddedLatch.countDown()
}
}

when:
// Add the sink connector to the herder
herder.putConnectorConfig("file-sink-connector", connectorProps, false, addConnectorCallback)

// Wait for the connector to be added
boolean connectorAdded = connectorAddedLatch.await(10, TimeUnit.SECONDS)
assert connectorAdded : "Sink connector was not added in time"

// Produce a message to the topic that we expect to be written to the file
Properties producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)
producer.send(new ProducerRecord<>("test-topic", "key1", "Hello Kafka Sink"))
producer.flush()
producer.close()

for (int i = 0; i < 100; i++) { // Try for up to 10 seconds
Thread.sleep(100)
if (sinkFile.text.contains("Hello Kafka Sink")) {
break
}
}

String fileContents = sinkFile.text
TEST_DATA_STREAMS_WRITER.waitForGroups(2)

then:
fileContents.contains("Hello Kafka Sink")

StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
assert [
"direction:out",
"topic:test-topic",
"type:kafka"
].every( tag -> edgeTags.contains(tag) )
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
verifyAll(second) {
assert [
"direction:in",
"group:connect-file-sink-connector",
"topic:test-topic",
"type:kafka"
].every( tag -> edgeTags.contains(tag) )
}
TEST_DATA_STREAMS_WRITER.getServices().contains('file-sink-connector')


cleanup:
herder?.stop()
worker?.stop()
sinkFile?.delete()
}

@Override
protected boolean isDataStreamsEnabled() {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor {
private final TimeSource timeSource;
private final Supplier<TraceConfig> traceConfigSupplier;
private final long hashOfKnownTags;
private final String serviceNameOverride;
private final ThreadLocal<String> serviceNameOverride;

public DataStreamContextExtractor(
HttpCodec.Extractor delegate,
TimeSource timeSource,
Supplier<TraceConfig> traceConfigSupplier,
long hashOfKnownTags,
String serviceNameOverride) {
ThreadLocal<String> serviceNameOverride) {
this.delegate = delegate;
this.timeSource = timeSource;
this.traceConfigSupplier = traceConfigSupplier;
Expand All @@ -41,7 +41,7 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett
if (shouldExtractPathwayContext) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());

extracted.withPathwayContext(pathwayContext);
}
Expand All @@ -50,7 +50,7 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett
} else if (traceConfigSupplier.get().isDataStreamsEnabled()) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
carrier, getter, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());

if (pathwayContext != null) {
extracted = new TagContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public class DataStreamPropagator implements Propagator {
private final Supplier<TraceConfig> traceConfigSupplier;
private final TimeSource timeSource;
private final long hashOfKnownTags;
private final String serviceNameOverride;
private final ThreadLocal<String> serviceNameOverride;

public DataStreamPropagator(
Supplier<TraceConfig> traceConfigSupplier,
TimeSource timeSource,
long hashOfKnownTags,
String serviceNameOverride) {
ThreadLocal<String> serviceNameOverride) {
this.traceConfigSupplier = traceConfigSupplier;
this.timeSource = timeSource;
this.hashOfKnownTags = hashOfKnownTags;
Expand Down Expand Up @@ -78,6 +78,6 @@ private boolean isDsmEnabled(@Nullable TagContext tagContext) {

private <C> PathwayContext extractDsmPathwayContext(C carrier, CarrierVisitor<C> visitor) {
return DefaultPathwayContext.extract(
carrier, visitor, this.timeSource, this.hashOfKnownTags, this.serviceNameOverride);
carrier, visitor, this.timeSource, this.hashOfKnownTags, serviceNameOverride.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ public PathwayContext newPathwayContext() {
@Override
public Propagator propagator() {
return new DataStreamPropagator(
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName());
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, serviceNameOverride);
}

@Override
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
return new DataStreamContextExtractor(
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, getThreadServiceName());
delegate, timeSource, traceConfigSupplier, hashOfKnownTags, serviceNameOverride);
}

@Override
Expand Down
Loading