|
| 1 | +import tarfile |
| 2 | +import time |
| 3 | +from io import BytesIO |
| 4 | +from textwrap import dedent |
| 5 | + |
| 6 | +from kafka import KafkaConsumer |
| 7 | +from kafka.errors import KafkaError |
| 8 | + |
| 9 | +from testcontainers.core.container import DockerContainer |
| 10 | +from testcontainers.core.waiting_utils import wait_container_is_ready |
| 11 | + |
| 12 | + |
| 13 | +class KafkaContainer(DockerContainer): |
| 14 | + KAFKA_PORT = 9093 |
| 15 | + TC_START_SCRIPT = '/tc-start.sh' |
| 16 | + |
| 17 | + def __init__(self, image="confluentinc/cp-kafka:5.4.3", port_to_expose=KAFKA_PORT): |
| 18 | + super(KafkaContainer, self).__init__(image) |
| 19 | + self.port_to_expose = port_to_expose |
| 20 | + self.with_exposed_ports(self.port_to_expose) |
| 21 | + listeners = 'PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9092'.format(port_to_expose) |
| 22 | + self.with_env('KAFKA_LISTENERS', listeners) |
| 23 | + self.with_env('KAFKA_LISTENER_SECURITY_PROTOCOL_MAP', |
| 24 | + 'BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT') |
| 25 | + self.with_env('KAFKA_INTER_BROKER_LISTENER_NAME', 'BROKER') |
| 26 | + |
| 27 | + self.with_env('KAFKA_BROKER_ID', '1') |
| 28 | + self.with_env('KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR', '1') |
| 29 | + self.with_env('KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS', '1') |
| 30 | + self.with_env('KAFKA_LOG_FLUSH_INTERVAL_MESSAGES', '10000000') |
| 31 | + self.with_env('KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS', '0') |
| 32 | + |
| 33 | + def get_bootstrap_server(self): |
| 34 | + host = self.get_container_host_ip() |
| 35 | + port = self.get_exposed_port(self.port_to_expose) |
| 36 | + return '{}:{}'.format(host, port) |
| 37 | + |
| 38 | + @wait_container_is_ready() |
| 39 | + def _connect(self): |
| 40 | + bootstrap_server = self.get_bootstrap_server() |
| 41 | + consumer = KafkaConsumer(group_id='test', bootstrap_servers=[bootstrap_server]) |
| 42 | + if not consumer.topics(): |
| 43 | + raise KafkaError("Unable to connect with kafka container!") |
| 44 | + |
| 45 | + def tc_start(self): |
| 46 | + port = self.get_exposed_port(self.port_to_expose) |
| 47 | + listeners = 'PLAINTEXT://localhost:{},BROKER://$(hostname -i):9092'.format(port) |
| 48 | + data = ( |
| 49 | + dedent( |
| 50 | + """ |
| 51 | + #!/bin/bash |
| 52 | + echo 'clientPort=2181' > zookeeper.properties |
| 53 | + echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties |
| 54 | + echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties |
| 55 | + zookeeper-server-start zookeeper.properties & |
| 56 | + export KAFKA_ZOOKEEPER_CONNECT='localhost:2181' |
| 57 | + export KAFKA_ADVERTISED_LISTENERS={} |
| 58 | + . /etc/confluent/docker/bash-config |
| 59 | + /etc/confluent/docker/configure |
| 60 | + /etc/confluent/docker/launch |
| 61 | + """.format(listeners) |
| 62 | + ) |
| 63 | + .strip() |
| 64 | + .encode('utf-8') |
| 65 | + ) |
| 66 | + self.create_file(data, KafkaContainer.TC_START_SCRIPT) |
| 67 | + |
| 68 | + def start(self): |
| 69 | + script = KafkaContainer.TC_START_SCRIPT |
| 70 | + command = 'sh -c "while [ ! -f {} ]; do sleep 0.1; done; sh {}"'.format(script, script) |
| 71 | + self.with_command(command) |
| 72 | + super().start() |
| 73 | + self.tc_start() |
| 74 | + self._connect() |
| 75 | + return self |
| 76 | + |
| 77 | + def create_file(self, content: bytes, path: str): |
| 78 | + with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar: |
| 79 | + tarinfo = tarfile.TarInfo(name=path) |
| 80 | + tarinfo.size = len(content) |
| 81 | + tarinfo.mtime = time.time() |
| 82 | + tar.addfile(tarinfo, BytesIO(content)) |
| 83 | + archive.seek(0) |
| 84 | + self.get_wrapped_container().put_archive("/", archive) |
0 commit comments