Skip to content

Commit 0cc9a72

Browse files
gudjonragnarGudjon Ragnar Brynjarsson
authored and
bstrausser
committed
fix(kafka): wait_for_logs in kafka container to reduce lib requirement (testcontainers#377)
Use `wait_for_logs` to wait for startup instead of waiting for successful connection via kafka-python. Also removes the dependency on kafka-python. Closes testcontainers#351 --------- Co-authored-by: Gudjon Ragnar Brynjarsson <[email protected]>
1 parent f83b8f1 commit 0cc9a72

File tree

3 files changed

+17
-23
lines changed

3 files changed

+17
-23
lines changed

modules/kafka/testcontainers/kafka/__init__.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
from io import BytesIO
44
from textwrap import dedent
55

6-
from kafka import KafkaConsumer
7-
from kafka.errors import KafkaError, NoBrokersAvailable, UnrecognizedBrokerVersion
86
from testcontainers.core.container import DockerContainer
97
from testcontainers.core.utils import raise_for_deprecated_parameter
10-
from testcontainers.core.waiting_utils import wait_container_is_ready
8+
from testcontainers.core.waiting_utils import wait_for_logs
119

1210

1311
class KafkaContainer(DockerContainer):
@@ -47,13 +45,6 @@ def get_bootstrap_server(self) -> str:
4745
port = self.get_exposed_port(self.port)
4846
return f"{host}:{port}"
4947

50-
@wait_container_is_ready(UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError)
51-
def _connect(self) -> None:
52-
bootstrap_server = self.get_bootstrap_server()
53-
consumer = KafkaConsumer(group_id="test", bootstrap_servers=[bootstrap_server])
54-
if not consumer.bootstrap_connected():
55-
raise KafkaError("Unable to connect with kafka container!")
56-
5748
def tc_start(self) -> None:
5849
host = self.get_container_host_ip()
5950
port = self.get_exposed_port(self.port)
@@ -78,13 +69,13 @@ def tc_start(self) -> None:
7869
)
7970
self.create_file(data, KafkaContainer.TC_START_SCRIPT)
8071

81-
def start(self) -> "KafkaContainer":
72+
def start(self, timeout=30) -> "KafkaContainer":
8273
script = KafkaContainer.TC_START_SCRIPT
8374
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
8475
self.with_command(command)
8576
super().start()
8677
self.tc_start()
87-
self._connect()
78+
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
8879
return self
8980

9081
def create_file(self, content: bytes, path: str) -> None:

poetry.lock

+12-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ influxdb = { version = "*", optional = true }
7171
influxdb-client = { version = "*", optional = true }
7272
kubernetes = { version = "*", optional = true }
7373
pyyaml = { version = "*", optional = true }
74-
kafka-python = { version = "*", optional = true }
7574
python-keycloak = { version = "*", optional = true }
7675
boto3 = { version = "*", optional = true }
7776
minio = { version = "*", optional = true }
@@ -94,7 +93,7 @@ elasticsearch = []
9493
google = ["google-cloud-pubsub"]
9594
influxdb = ["influxdb", "influxdb-client"]
9695
k3s = ["kubernetes", "pyyaml"]
97-
kafka = ["kafka-python"]
96+
kafka = []
9897
keycloak = ["python-keycloak"]
9998
localstack = ["boto3"]
10099
minio = ["minio"]
@@ -123,7 +122,7 @@ anyio = "^4.3.0"
123122
psycopg2-binary = "*"
124123
pg8000 = "*"
125124
sqlalchemy = "*"
126-
125+
kafka-python = "^2.0.2"
127126

128127
[[tool.poetry.source]]
129128
name = "PyPI"

0 commit comments

Comments
 (0)