Skip to content

Commit 531d358

Browse files
authored
Merge pull request #209 from andreezy777/update_kafka_container
chore: rid of deprecated class
2 parents 29e74b2 + 55c2580 commit 531d358

File tree

2 files changed

+22
-43
lines changed

2 files changed

+22
-43
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,29 @@
11
package com.dimafeng.testcontainers
22

33
import org.testcontainers.containers.{KafkaContainer => JavaKafkaContainer}
4+
import org.testcontainers.utility.DockerImageName
45

5-
class KafkaContainer(confluentPlatformVersion: Option[String] = None,
6-
externalZookeeper: Option[String] = None) extends SingleContainer[JavaKafkaContainer] {
6+
case class KafkaContainer(dockerImageName: DockerImageName = DockerImageName.parse(KafkaContainer.defaultDockerImageName)
7+
) extends SingleContainer[JavaKafkaContainer] {
78

8-
@deprecated("Please use reflective methods of the scala container or `configure` method")
9-
val kafkaContainer: JavaKafkaContainer = {
10-
if (confluentPlatformVersion.isEmpty) {
11-
new JavaKafkaContainer()
12-
} else {
13-
new JavaKafkaContainer(confluentPlatformVersion.get)
14-
}
15-
}
16-
17-
if (externalZookeeper.isEmpty) {
18-
kafkaContainer.withEmbeddedZookeeper()
19-
} else {
20-
kafkaContainer.withExternalZookeeper(externalZookeeper.get)
21-
}
22-
23-
override val container: JavaKafkaContainer = kafkaContainer
9+
override val container: JavaKafkaContainer = new JavaKafkaContainer(dockerImageName)
2410

2511
def bootstrapServers: String = container.getBootstrapServers
2612
}
2713

2814
object KafkaContainer {
2915

16+
val defaultImage = "confluentinc/cp-kafka"
3017
val defaultTag = "5.2.1"
18+
val defaultDockerImageName = s"$defaultImage:$defaultTag"
3119

32-
def apply(confluentPlatformVersion: String = null,
33-
externalZookeeper: String = null): KafkaContainer = {
34-
new KafkaContainer(Option(confluentPlatformVersion), Option(externalZookeeper))
35-
}
36-
37-
case class Def(
38-
confluentPlatformVersion: String = defaultTag,
39-
externalZookeeper: Option[String] = None
40-
) extends ContainerDef {
20+
case class Def(dockerImageName: DockerImageName = DockerImageName.parse(KafkaContainer.defaultDockerImageName)
21+
) extends ContainerDef {
4122

4223
override type Container = KafkaContainer
4324

4425
override def createContainer(): KafkaContainer = {
45-
new KafkaContainer(
46-
confluentPlatformVersion = Some(confluentPlatformVersion),
47-
externalZookeeper = externalZookeeper
48-
)
26+
new KafkaContainer(dockerImageName)
4927
}
5028
}
51-
}
29+
}

modules/kafka/src/test/scala/com/dimafeng/testcontainers/integration/SchemaRegistrySpec.scala

+12-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
77
import org.scalatest.flatspec.AnyFlatSpec
88
import org.scalatest.matchers.should.Matchers
99
import org.testcontainers.containers.Network
10+
import org.testcontainers.utility.DockerImageName
1011

1112
import java.util.Properties
1213
import scala.collection.JavaConverters._
@@ -26,26 +27,26 @@ class SchemaRegistrySpec extends AnyFlatSpec with ForAllTestContainer with Match
2627
//a way to communicate containers
2728
val network: Network = Network.newNetwork()
2829

29-
val kafkaContainer: KafkaContainer = KafkaContainer.Def(kafkaVersion).createContainer()
30+
val kafkaContainer: KafkaContainer = KafkaContainer.Def(DockerImageName.parse(s"confluentinc/cp-kafka:$kafkaVersion")).createContainer()
3031
val schemaRegistryContainer: GenericContainer = SchemaRegistryContainer.Def(network, hostName, kafkaVersion).createContainer()
3132

3233
kafkaContainer.container
33-
.withNetwork(network)
34-
.withNetworkAliases(hostName)
35-
.withEnv(
36-
Map[String, String](
37-
"KAFKA_BROKER_ID" -> brokerId.toString,
38-
"KAFKA_HOST_NAME" -> hostName,
39-
"KAFKA_AUTO_CREATE_TOPICS_ENABLE" -> "false"
40-
).asJava
41-
)
34+
.withNetwork(network)
35+
.withNetworkAliases(hostName)
36+
.withEnv(
37+
Map[String, String](
38+
"KAFKA_BROKER_ID" -> brokerId.toString,
39+
"KAFKA_HOST_NAME" -> hostName,
40+
"KAFKA_AUTO_CREATE_TOPICS_ENABLE" -> "false"
41+
).asJava
42+
)
4243

4344
override val container: MultipleContainers = MultipleContainers(kafkaContainer, schemaRegistryContainer)
4445

4546
def getKafkaAddress: String = kafkaContainer.bootstrapServers
4647

4748
def getSchemaRegistryAddress: String =
48-
s"http://${schemaRegistryContainer.container.getHost}:${schemaRegistryContainer.container.getMappedPort(SchemaRegistryContainer.defaultSchemaPort)}"
49+
s"http://${schemaRegistryContainer.container.getHost}:${schemaRegistryContainer.container.getMappedPort(SchemaRegistryContainer.defaultSchemaPort)}"
4950

5051

5152
"Schema registry container" should "be started" in {

0 commit comments

Comments
 (0)