-
Notifications
You must be signed in to change notification settings - Fork 440
Compatibility with Phprdkafka 4.0 #959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ class RdKafkaContext implements Context | |
private $conf; | ||
|
||
/** | ||
* @var Producer | ||
* @var RdKafkaProducer | ||
*/ | ||
private $producer; | ||
|
||
|
@@ -50,9 +50,6 @@ class RdKafkaContext implements Context | |
*/ | ||
private $rdKafkaConsumers; | ||
|
||
/** | ||
* @param array $config | ||
*/ | ||
public function __construct(array $config) | ||
{ | ||
$this->config = $config; | ||
|
@@ -96,7 +93,23 @@ public function createTemporaryQueue(): Queue | |
*/ | ||
public function createProducer(): Producer | ||
{ | ||
return new RdKafkaProducer($this->getProducer(), $this->getSerializer()); | ||
if (!isset($this->producer)) { | ||
$producer = new VendorProducer($this->getConf()); | ||
|
||
if (isset($this->config['log_level'])) { | ||
$producer->setLogLevel($this->config['log_level']); | ||
} | ||
|
||
$this->producer = new RdKafkaProducer($producer, $this->getSerializer()); | ||
|
||
// Once created RdKafkaProducer can store messages internally that need to be delivered before PHP shuts | ||
// down. Otherwise, we are bound to lose messages in transit. | ||
// Note that it is generally preferable to call "close" method explicitly before shutdown starts, since | ||
// otherwise we might not have access to some objects, like database connections. | ||
register_shutdown_function([$this->producer, 'flush'], $this->config['shutdown_timeout'] ?? -1); | ||
} | ||
|
||
return $this->producer; | ||
} | ||
|
||
/** | ||
|
@@ -139,6 +152,11 @@ public function close(): void | |
foreach ($kafkaConsumers as $kafkaConsumer) { | ||
$kafkaConsumer->unsubscribe(); | ||
} | ||
|
||
// Compatibility with phprdkafka 4.0. | ||
if (isset($this->producer)) { | ||
$this->producer->flush($this->config['shutdown_timeout'] ?? -1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe throw an error or log something if the return value is not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As what we discussed: throwing an exception here would change the behavior and make it necessary for wrapping libraries (like enqueue-bundle) to catch it, so I've left it out. However, I do agree that it is something that this package eventually should do, since otherwise potential delivery errors will be hidden from end user. |
||
} | ||
} | ||
|
||
public function createSubscriptionConsumer(): SubscriptionConsumer | ||
|
@@ -163,19 +181,6 @@ public static function getLibrdKafkaVersion(): string | |
return "$major.$minor.$patch"; | ||
} | ||
|
||
private function getProducer(): VendorProducer | ||
{ | ||
if (null === $this->producer) { | ||
$this->producer = new VendorProducer($this->getConf()); | ||
|
||
if (isset($this->config['log_level'])) { | ||
$this->producer->setLogLevel($this->config['log_level']); | ||
} | ||
} | ||
|
||
return $this->producer; | ||
} | ||
|
||
private function getConf(): Conf | ||
{ | ||
if (null === $this->conf) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,15 +7,15 @@ | |
"license": "MIT", | ||
"require": { | ||
"php": "^7.1.3", | ||
"ext-rdkafka": "^3.0.3", | ||
"ext-rdkafka": "^3.0.3|^4.0", | ||
"queue-interop/queue-interop": "^0.8" | ||
}, | ||
"require-dev": { | ||
"phpunit/phpunit": "~7.5", | ||
"enqueue/test": "0.10.x-dev", | ||
"enqueue/null": "0.10.x-dev", | ||
"queue-interop/queue-spec": "^0.6", | ||
"kwn/php-rdkafka-stubs": "^1.0.2" | ||
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0" | ||
}, | ||
"support": { | ||
"email": "[email protected]", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is
setLogLevel
ofRdKafka\Producer
be aware that it has been deprecated. Just havinglog_level
inRdKafka\Conf
is fine ✌️@Steveb-p i do tend to forget which producer is which in enqueue, forgive me if my assumption is not correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nick-zh you're correct, this is directly calling
setLogLevel
on Kafka Producer instance.At this point I'm not really going to remove this deprecation, since in general PHP frameworks and applications should only log the deprecation (similarly to the default topic case). I'll address this in a separate PR.