@@ -6,6 +6,9 @@ import org.apache.kafka.clients.admin.DescribeClusterResult
6
6
import org.apache.kafka.clients.consumer.ConsumerConfig
7
7
import org.apache.kafka.clients.consumer.ConsumerRecords
8
8
import org.apache.kafka.clients.consumer.KafkaConsumer
9
+ import org.apache.kafka.clients.producer.KafkaProducer
10
+ import org.apache.kafka.clients.producer.ProducerConfig
11
+ import org.apache.kafka.clients.producer.ProducerRecord
9
12
import org.apache.kafka.common.utils.Time
10
13
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
11
14
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -176,6 +179,137 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {
176
179
tempFile?. delete()
177
180
}
178
181
182
+ def " test kafka-connect sink instrumentation" () {
183
+ String bootstrapServers = embeddedKafka. getBrokersAsString()
184
+
185
+ Properties adminProps = new Properties ()
186
+ adminProps. put(AdminClientConfig . BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
187
+ String clusterId = null
188
+ try (AdminClient adminClient = AdminClient . create(adminProps)) {
189
+ DescribeClusterResult describeClusterResult = adminClient. describeCluster()
190
+ clusterId = describeClusterResult. clusterId(). get()
191
+ }
192
+ assert clusterId != null : " Cluster ID is null"
193
+
194
+ // Create a temporary file where the sink connector should write
195
+ File sinkFile = File . createTempFile(" sink-messages" , " .txt" )
196
+ if (sinkFile. exists()) {
197
+ sinkFile. delete()
198
+ }
199
+ sinkFile. deleteOnExit()
200
+
201
+ Properties workerProps = new Properties ()
202
+ workerProps. put(WorkerConfig . BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
203
+ workerProps. put(WorkerConfig . KEY_CONVERTER_CLASS_CONFIG , " org.apache.kafka.connect.storage.StringConverter" )
204
+ workerProps. put(WorkerConfig . VALUE_CONVERTER_CLASS_CONFIG , " org.apache.kafka.connect.storage.StringConverter" )
205
+ workerProps. put(StandaloneConfig . OFFSET_STORAGE_FILE_FILENAME_CONFIG , " /tmp/connect.offsets" )
206
+ workerProps. put(WorkerConfig . INTERNAL_KEY_CONVERTER_CLASS_CONFIG , " org.apache.kafka.connect.json.JsonConverter" )
207
+ workerProps. put(WorkerConfig . INTERNAL_VALUE_CONVERTER_CLASS_CONFIG , " org.apache.kafka.connect.json.JsonConverter" )
208
+ workerProps. put(WorkerConfig . PLUGIN_PATH_CONFIG , " " ) // Required but can be empty for built-in connectors
209
+ workerProps. put(" plugin.scan.classpath" , " true" )
210
+
211
+ Map<String , String > workerPropsMap = workerProps. stringPropertyNames()
212
+ .collectEntries { [(it): workerProps. getProperty(it)] }
213
+
214
+ // Create the Connect worker
215
+ Time time = Time . SYSTEM
216
+ Plugins plugins = new Plugins (workerPropsMap)
217
+ plugins. compareAndSwapWithDelegatingLoader()
218
+ String workerId = " worker-1"
219
+
220
+ FileOffsetBackingStore offsetBackingStore = new FileOffsetBackingStore ()
221
+ WorkerConfig workerConfig = new StandaloneConfig (workerPropsMap)
222
+ offsetBackingStore. configure(workerConfig)
223
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy ()
224
+ Worker worker = new Worker (workerId, time, plugins, workerConfig, offsetBackingStore, connectorClientConfigOverridePolicy)
225
+ Herder herder = new StandaloneHerder (worker, clusterId, connectorClientConfigOverridePolicy)
226
+
227
+ // Start worker and herder
228
+ worker. start()
229
+ herder. start()
230
+
231
+ // Create the sink connector configuration
232
+ Map<String , String > connectorProps = [
233
+ ' name' : ' file-sink-connector' ,
234
+ ' connector.class' : ' org.apache.kafka.connect.file.FileStreamSinkConnector' ,
235
+ ' tasks.max' : ' 1' ,
236
+ ' file' : sinkFile. getAbsolutePath(),
237
+ ' topics' : ' test-topic'
238
+ ]
239
+
240
+ // Latch to wait for connector addition
241
+ CountDownLatch connectorAddedLatch = new CountDownLatch (1 )
242
+ Callback<Herder.Created <ConnectorInfo> > addConnectorCallback = new Callback<Herder.Created <ConnectorInfo> > () {
243
+ @Override
244
+ void onCompletion (Throwable error , Herder.Created <ConnectorInfo > result ) {
245
+ if (error != null ) {
246
+ error. printStackTrace()
247
+ } else {
248
+ println " Sink connector added successfully."
249
+ }
250
+ connectorAddedLatch. countDown()
251
+ }
252
+ }
253
+
254
+ when :
255
+ // Add the sink connector to the herder
256
+ herder. putConnectorConfig(" file-sink-connector" , connectorProps, false , addConnectorCallback)
257
+
258
+ // Wait for the connector to be added
259
+ boolean connectorAdded = connectorAddedLatch. await(10 , TimeUnit . SECONDS )
260
+ assert connectorAdded : " Sink connector was not added in time"
261
+
262
+ // Produce a message to the topic that we expect to be written to the file
263
+ Properties producerProps = new Properties ()
264
+ producerProps. put(ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
265
+ producerProps. put(ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , " org.apache.kafka.common.serialization.StringSerializer" )
266
+ producerProps. put(ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , " org.apache.kafka.common.serialization.StringSerializer" )
267
+
268
+ KafkaProducer<String , String > producer = new KafkaProducer<> (producerProps)
269
+ producer. send(new ProducerRecord<> (" test-topic" , " key1" , " Hello Kafka Sink" ))
270
+ producer. flush()
271
+ producer. close()
272
+
273
+ for (int i = 0 ; i < 100 ; i++ ) { // Try for up to 10 seconds
274
+ Thread . sleep(100 )
275
+ if (sinkFile. text. contains(" Hello Kafka Sink" )) {
276
+ break
277
+ }
278
+ }
279
+
280
+ String fileContents = sinkFile. text
281
+ TEST_DATA_STREAMS_WRITER . waitForGroups(2 )
282
+
283
+ then :
284
+ fileContents. contains(" Hello Kafka Sink" )
285
+
286
+ StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
287
+ verifyAll(first) {
288
+ assert [
289
+ " direction:out" ,
290
+ " topic:test-topic" ,
291
+ " type:kafka"
292
+ ]. every( tag -> edgeTags. contains(tag) )
293
+ }
294
+
295
+ StatsGroup second = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == first. hash }
296
+ verifyAll(second) {
297
+ assert [
298
+ " direction:in" ,
299
+ " group:connect-file-sink-connector" ,
300
+ " topic:test-topic" ,
301
+ " type:kafka"
302
+ ]. every( tag -> edgeTags. contains(tag) )
303
+ }
304
+ TEST_DATA_STREAMS_WRITER . getServices(). contains(' file-sink-connector' )
305
+
306
+
307
+ cleanup :
308
+ herder?. stop()
309
+ worker?. stop()
310
+ sinkFile?. delete()
311
+ }
312
+
179
313
@Override
180
314
protected boolean isDataStreamsEnabled () {
181
315
return true
0 commit comments