diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 6270b2230..7856cd4cb 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -35,6 +35,11 @@ class RdKafkaContext implements Context */ private $conf; + /** + * @var TopicConf + */ + private $topicConf; + /** * @var Producer */ @@ -75,7 +80,7 @@ public function createMessage(string $body = '', array $properties = [], array $ */ public function createTopic(string $topicName): Topic { - return new RdKafkaTopic($topicName); + return new RdKafkaTopic($topicName, $this->getTopicConf()); } /** @@ -83,7 +88,7 @@ public function createTopic(string $topicName): Topic */ public function createQueue(string $queueName): Queue { - return new RdKafkaTopic($queueName); + return new RdKafkaTopic($queueName, $this->getTopicConf()); } public function createTemporaryQueue(): Queue @@ -176,21 +181,30 @@ private function getProducer(): VendorProducer return $this->producer; } - private function getConf(): Conf + private function getTopicConf(): TopicConf { - if (null === $this->conf) { - $topicConf = new TopicConf(); + if (null === $this->topicConf) { + $this->topicConf = new TopicConf(); if (isset($this->config['topic']) && is_array($this->config['topic'])) { foreach ($this->config['topic'] as $key => $value) { - $topicConf->set($key, $value); + $this->topicConf->set($key, $value); } } if (isset($this->config['partitioner'])) { - $topicConf->setPartitioner($this->config['partitioner']); + $this->topicConf->setPartitioner($this->config['partitioner']); } + } + + return $this->topicConf; + } + + private function getConf(): Conf + { + if (null === $this->conf) { + $this->conf = new Conf(); if (isset($this->config['global']) && is_array($this->config['global'])) { @@ -214,8 +228,6 @@ private function getConf(): Conf if (isset($this->config['stats_cb'])) { $this->conf->setStatsCb($this->config['stats_cb']); } - - $this->conf->setDefaultTopicConf($topicConf); } return $this->conf; diff --git a/pkg/rdkafka/RdKafkaTopic.php b/pkg/rdkafka/RdKafkaTopic.php index a7bde1021..c132de364 100644 --- a/pkg/rdkafka/RdKafkaTopic.php +++ b/pkg/rdkafka/RdKafkaTopic.php @@ -30,9 +30,10 @@ class RdKafkaTopic implements Topic, Queue */ private $key; - public function __construct(string $name) + public function __construct(string $name, ?TopicConf $conf = null) { $this->name = $name; + $this->conf = $conf; } public function getTopicName(): string diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index 7cc601aef..8b3909838 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -145,8 +145,7 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic() $producer = new RdKafkaProducer($kafkaProducer, $serializer); - $topic = new RdKafkaTopic('theQueueName'); - $topic->setConf($conf); + $topic = new RdKafkaTopic('theQueueName', $conf); $producer->send($topic, new RdKafkaMessage()); }