Skip to content

Commit 6c7cd7e

Browse files
authored
Merge pull request #328 from php-enqueue/amqp-fix
[amqp] fix signal handler if consume called from consume
2 parents 1910ca5 + 3dacfee commit 6c7cd7e

File tree

2 files changed

+8
-18
lines changed

2 files changed

+8
-18
lines changed

pkg/amqp-bunny/AmqpContext.php

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5555
*/
5656
private $subscribers;
5757

58-
/**
59-
* @var SignalSocketHelper
60-
*/
61-
private $signalSocketHandler;
62-
6358
/**
6459
* Callable must return instance of \Bunny\Channel once called.
6560
*
@@ -85,7 +80,6 @@ public function __construct($bunnyChannel, $config = [])
8580

8681
$this->buffer = new Buffer();
8782
$this->subscribers = [];
88-
$this->signalSocketHandler = new SignalSocketHelper();
8983
}
9084

9185
/**
@@ -396,18 +390,19 @@ public function consume($timeout = 0)
396390
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
397391
}
398392

399-
$this->signalSocketHandler->beforeSocket();
393+
$signalHandler = new SignalSocketHelper();
394+
$signalHandler->beforeSocket();
400395

401396
try {
402397
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
403398
} catch (ClientException $e) {
404-
if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) {
399+
if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) {
405400
return;
406401
}
407402

408403
throw $e;
409404
} finally {
410-
$this->signalSocketHandler->afterSocket();
405+
$signalHandler->afterSocket();
411406
}
412407
}
413408

pkg/amqp-lib/AmqpContext.php

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,6 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5757
*/
5858
private $subscribers;
5959

60-
/**
61-
* @var SignalSocketHelper
62-
*/
63-
private $signalSocketHandler;
64-
6560
/**
6661
* @param AbstractConnection $connection
6762
* @param array $config
@@ -78,7 +73,6 @@ public function __construct(AbstractConnection $connection, $config = [])
7873
$this->connection = $connection;
7974
$this->buffer = new Buffer();
8075
$this->subscribers = [];
81-
$this->signalSocketHandler = new SignalSocketHelper();
8276
}
8377

8478
/**
@@ -390,7 +384,8 @@ public function consume($timeout = 0)
390384
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
391385
}
392386

393-
$this->signalSocketHandler->beforeSocket();
387+
$signalHandler = new SignalSocketHelper();
388+
$signalHandler->beforeSocket();
394389

395390
try {
396391
while (true) {
@@ -413,13 +408,13 @@ public function consume($timeout = 0)
413408
} catch (AMQPTimeoutException $e) {
414409
} catch (StopBasicConsumptionException $e) {
415410
} catch (AMQPIOWaitException $e) {
416-
if ($this->signalSocketHandler->wasThereSignal()) {
411+
if ($signalHandler->wasThereSignal()) {
417412
return;
418413
}
419414

420415
throw $e;
421416
} finally {
422-
$this->signalSocketHandler->afterSocket();
417+
$signalHandler->afterSocket();
423418
}
424419
}
425420

0 commit comments

Comments
 (0)