1+ import base64
12import tarfile
23import time
4+ import uuid
35from io import BytesIO
46from textwrap import dedent
57
8+ from typing_extensions import Self
9+
610from testcontainers .core .container import DockerContainer
711from testcontainers .core .utils import raise_for_deprecated_parameter
12+ from testcontainers .core .version import ComparableVersion
813from testcontainers .core .waiting_utils import wait_for_logs
914from testcontainers .kafka ._redpanda import RedpandaContainer
1015
@@ -29,15 +34,22 @@ class KafkaContainer(DockerContainer):
2934 """
3035
3136 TC_START_SCRIPT = "/tc-start.sh"
37+ MIN_KRAFT_TAG = "7.0.0"
3238
3339 def __init__ (self , image : str = "confluentinc/cp-kafka:7.6.0" , port : int = 9093 , ** kwargs ) -> None :
3440 raise_for_deprecated_parameter (kwargs , "port_to_expose" , "port" )
3541 super ().__init__ (image , ** kwargs )
3642 self .port = port
43+ self .kraft_enabled = False
44+ self .wait_for = r".*\[KafkaServer id=\d+\] started.*"
45+ self .boot_command = ""
46+ self .cluster_id = self ._random_uuid ()
47+ self .listeners = f"PLAINTEXT://0.0.0.0:{ self .port } ,BROKER://0.0.0.0:9092"
48+ self .security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
49+
3750 self .with_exposed_ports (self .port )
38- listeners = f"PLAINTEXT://0.0.0.0:{ self .port } ,BROKER://0.0.0.0:9092"
39- self .with_env ("KAFKA_LISTENERS" , listeners )
40- self .with_env ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT" )
51+ self .with_env ("KAFKA_LISTENERS" , self .listeners )
52+ self .with_env ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , self .security_protocol_map )
4153 self .with_env ("KAFKA_INTER_BROKER_LISTENER_NAME" , "BROKER" )
4254
4355 self .with_env ("KAFKA_BROKER_ID" , "1" )
@@ -46,6 +58,82 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093,
4658 self .with_env ("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES" , "10000000" )
4759 self .with_env ("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS" , "0" )
4860
61+ def with_kraft (self ) -> Self :
62+ self ._verify_min_kraft_version ()
63+ self .kraft_enabled = True
64+ return self
65+
66+ def _verify_min_kraft_version (self ):
67+ actual_version = self .image .split (":" )[- 1 ]
68+
69+ if ComparableVersion (actual_version ) < self .MIN_KRAFT_TAG :
70+ raise ValueError (
71+ f"Provided Confluent Platform's version { actual_version } "
72+ f"is not supported in Kraft mode"
73+ f" (must be { self .MIN_KRAFT_TAG } or above)"
74+ )
75+
76+ def with_cluster_id (self , cluster_id : str ) -> Self :
77+ self .cluster_id = cluster_id
78+ return self
79+
80+ @classmethod
81+ def _random_uuid (cls ):
82+ uuid_value = uuid .uuid4 ()
83+ uuid_bytes = uuid_value .bytes
84+ base64_encoded_uuid = base64 .b64encode (uuid_bytes )
85+
86+ return base64_encoded_uuid .decode ()
87+
88+ def configure (self ):
89+ if self .kraft_enabled :
90+ self ._configure_kraft ()
91+ else :
92+ self ._configure_zookeeper ()
93+
94+ def _configure_kraft (self ) -> None :
95+ self .wait_for = r".*Kafka Server started.*"
96+
97+ self .with_env ("CLUSTER_ID" , self .cluster_id )
98+ self .with_env ("KAFKA_NODE_ID" , 1 )
99+ self .with_env (
100+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ,
101+ f"{ self .security_protocol_map } ,CONTROLLER:PLAINTEXT" ,
102+ )
103+ self .with_env (
104+ "KAFKA_LISTENERS" ,
105+ f"{ self .listeners } ,CONTROLLER://0.0.0.0:9094" ,
106+ )
107+ self .with_env ("KAFKA_PROCESS_ROLES" , "broker,controller" )
108+
109+ network_alias = self ._get_network_alias ()
110+ controller_quorum_voters = f"1@{ network_alias } :9094"
111+ self .with_env ("KAFKA_CONTROLLER_QUORUM_VOTERS" , controller_quorum_voters )
112+ self .with_env ("KAFKA_CONTROLLER_LISTENER_NAMES" , "CONTROLLER" )
113+
114+ self .boot_command = f"""
115+ sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
116+ echo 'kafka-storage format --ignore-formatted -t { self .cluster_id } -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
117+ """
118+
119+ def _get_network_alias (self ):
120+ if self ._network :
121+ return next (
122+ iter (self ._network_aliases or [self ._network .name or self ._kwargs .get ("network" , [])]),
123+ None ,
124+ )
125+
126+ return "localhost"
127+
128+ def _configure_zookeeper (self ) -> None :
129+ self .boot_command = """
130+ echo 'clientPort=2181' > zookeeper.properties
131+ echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
132+ echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
133+ zookeeper-server-start zookeeper.properties &
134+ export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
135+ """
136+
49137 def get_bootstrap_server (self ) -> str :
50138 host = self .get_container_host_ip ()
51139 port = self .get_exposed_port (self .port )
@@ -59,11 +147,7 @@ def tc_start(self) -> None:
59147 dedent (
60148 f"""
61149 #!/bin/bash
62- echo 'clientPort=2181' > zookeeper.properties
63- echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
64- echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
65- zookeeper-server-start zookeeper.properties &
66- export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
150+ { self .boot_command }
67151 export KAFKA_ADVERTISED_LISTENERS={ listeners }
68152 . /etc/confluent/docker/bash-config
69153 /etc/confluent/docker/configure
@@ -78,10 +162,11 @@ def tc_start(self) -> None:
78162 def start (self , timeout = 30 ) -> "KafkaContainer" :
79163 script = KafkaContainer .TC_START_SCRIPT
80164 command = f'sh -c "while [ ! -f { script } ]; do sleep 0.1; done; sh { script } "'
165+ self .configure ()
81166 self .with_command (command )
82167 super ().start ()
83168 self .tc_start ()
84- wait_for_logs (self , r".*\[KafkaServer id=\d+\] started.*" , timeout = timeout )
169+ wait_for_logs (self , self . wait_for , timeout = timeout )
85170 return self
86171
87172 def create_file (self , content : bytes , path : str ) -> None :
0 commit comments