Skip to content

[amqp] Add ability to choose what receive method to use: basic_get or basic_consume. #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class AmqpConnectionFactory implements PsrConnectionFactory
* 'connect_timeout' => 'Connection timeout. Note: 0 or greater seconds. May be fractional.',
* 'persisted' => 'bool, Whether it use single persisted connection or open a new one for every context',
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* 'pre_fetch_count' => 'Controls how many messages could be prefetched',
* 'pre_fetch_size' => 'Controls how many messages could be prefetched',
* 'receive_method' => 'Could be either basic_get or basic_consume',
* ]
*
* or
Expand All @@ -50,6 +53,22 @@ public function __construct($config = 'amqp://')
}

$this->config = array_replace($this->defaultConfig(), $config);

$supportedMethods = ['basic_get', 'basic_consume'];
if (false == in_array($this->config['receive_method'], $supportedMethods, true)) {
throw new \LogicException(sprintf(
'Invalid "receive_method" option value "%s". It could be only "%s"',
$this->config['receive_method'],
implode('", "', $supportedMethods)
));
}

if ('basic_consume' == $this->config['receive_method']) {
if (false == (version_compare(phpversion('amqp'), '1.9.1', '>=') || phpversion('amqp') == '1.9.1-dev')) {
// @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281
throw new \LogicException('The "basic_consume" method does not work on amqp extension prior 1.9.1 version.');
}
}
}

/**
Expand All @@ -62,10 +81,10 @@ public function createContext()
if ($this->config['lazy']) {
return new AmqpContext(function () {
return $this->createExtContext($this->establishConnection());
});
}, $this->config['receive_method']);
}

return new AmqpContext($this->createExtContext($this->establishConnection()));
return new AmqpContext($this->createExtContext($this->establishConnection()), $this->config['receive_method']);
}

/**
Expand Down Expand Up @@ -171,6 +190,7 @@ private function defaultConfig()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
];
}
}
138 changes: 85 additions & 53 deletions pkg/amqp-ext/AmqpConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ class AmqpConsumer implements PsrConsumer
*/
private $isInit;

/**
* @var string
*/
private $receiveMethod;

/**
* @param AmqpContext $context
* @param AmqpQueue $queue
* @param Buffer $buffer
* @param string $receiveMethod
*/
public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buffer)
public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buffer, $receiveMethod)
{
$this->queue = $queue;
$this->context = $context;
$this->buffer = $buffer;
$this->receiveMethod = $receiveMethod;

$this->isInit = false;
}
Expand All @@ -64,61 +71,15 @@ public function getQueue()
*/
public function receive($timeout = 0)
{
// @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281
$end = microtime(true) + ($timeout / 1000);
if ('basic_get' == $this->receiveMethod) {
return $this->receiveBasicGet($timeout);
}

while (0 === $timeout || microtime(true) < $end) {
if ($message = $this->receiveNoWait()) {
return $message;
}
if ('basic_consume' == $this->receiveMethod) {
return $this->receiveBasicConsume($timeout);
}

// if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
// return $message;
// }
//
// /** @var \AMQPQueue $extQueue */
// $extConnection = $this->getExtQueue()->getChannel()->getConnection();
//
// $originalTimeout = $extConnection->getReadTimeout();
// try {
// $extConnection->setReadTimeout($timeout / 1000);
//
// if (false == $this->isInit) {
// $this->getExtQueue()->consume(null, AMQP_NOPARAM);
//
// $this->isInit = true;
// }
//
// /** @var AmqpMessage|null $message */
// $message = null;
//
// $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
// $message = $this->convertMessage($extEnvelope);
// $message->setConsumerTag($q->getConsumerTag());
//
// if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
// return false;
// }
//
// // not our message, put it to buffer and continue.
// $this->buffer->push($q->getConsumerTag(), $message);
//
// $message = null;
//
// return true;
// }, AMQP_JUST_CONSUME);
//
// return $message;
// } catch (\AMQPQueueException $e) {
// if ('Consumer timeout exceed' == $e->getMessage()) {
// return null;
// }
//
// throw $e;
// } finally {
// $extConnection->setReadTimeout($originalTimeout);
// }
throw new \LogicException('The "receiveMethod" is not supported');
}

/**
Expand Down Expand Up @@ -160,6 +121,77 @@ public function reject(PsrMessage $message, $requeue = false)
);
}

/**
* @param int $timeout
*
* @return AmqpMessage|null
*/
private function receiveBasicGet($timeout)
{
$end = microtime(true) + ($timeout / 1000);

while (0 === $timeout || microtime(true) < $end) {
if ($message = $this->receiveNoWait()) {
return $message;
}
}
}

/**
* @param int $timeout
*
* @return AmqpMessage|null
*/
private function receiveBasicConsume($timeout)
{
if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
return $message;
}

/** @var \AMQPQueue $extQueue */
$extConnection = $this->getExtQueue()->getChannel()->getConnection();

$originalTimeout = $extConnection->getReadTimeout();
try {
$extConnection->setReadTimeout($timeout / 1000);

if (false == $this->isInit) {
$this->getExtQueue()->consume(null, AMQP_NOPARAM);

$this->isInit = true;
}

/** @var AmqpMessage|null $message */
$message = null;

$this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
$message = $this->convertMessage($extEnvelope);
$message->setConsumerTag($q->getConsumerTag());

if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
return false;
}

// not our message, put it to buffer and continue.
$this->buffer->push($q->getConsumerTag(), $message);

$message = null;

return true;
}, AMQP_JUST_CONSUME);

return $message;
} catch (\AMQPQueueException $e) {
if ('Consumer timeout exceed' == $e->getMessage()) {
return null;
}

throw $e;
} finally {
$extConnection->setReadTimeout($originalTimeout);
}
}

/**
* @param \AMQPEnvelope $extEnvelope
*
Expand Down
14 changes: 11 additions & 3 deletions pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@ class AmqpContext implements PsrContext
*/
private $buffer;

/**
* @var string
*/
private $receiveMethod;

/**
* Callable must return instance of \AMQPChannel once called.
*
* @param \AMQPChannel|callable $extChannel
* @param string $receiveMethod
*/
public function __construct($extChannel)
public function __construct($extChannel, $receiveMethod)
{
$this->receiveMethod = $receiveMethod;

if ($extChannel instanceof \AMQPChannel) {
$this->extChannel = $extChannel;
} elseif (is_callable($extChannel)) {
Expand Down Expand Up @@ -181,10 +189,10 @@ public function createConsumer(PsrDestination $destination)
$queue = $this->createTemporaryQueue();
$this->bind($destination, $queue);

return new AmqpConsumer($this, $queue, $this->buffer);
return new AmqpConsumer($this, $queue, $this->buffer, $this->receiveMethod);
}

return new AmqpConsumer($this, $destination, $this->buffer);
return new AmqpConsumer($this, $destination, $this->buffer, $this->receiveMethod);
}

public function close()
Expand Down
17 changes: 17 additions & 0 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public function testThrowIfDsnCouldNotBeParsed()
new AmqpConnectionFactory('amqp://:@/');
}

public function testThrowIfReceiveMenthodIsInvalid()
{
$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Invalid "receive_method" option value "invalidMethod". It could be only "basic_get", "basic_consume"');

new AmqpConnectionFactory(['receive_method' => 'invalidMethod']);
}

/**
* @dataProvider provideConfigs
*
Expand Down Expand Up @@ -67,6 +75,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -87,6 +96,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -105,6 +115,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -123,6 +134,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -141,6 +153,7 @@ public static function provideConfigs()
'lazy' => '',
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -159,6 +172,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -177,6 +191,7 @@ public static function provideConfigs()
'lazy' => false,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

Expand All @@ -195,6 +210,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
'receive_method' => 'basic_get',
],
];

Expand All @@ -213,6 +229,7 @@ public static function provideConfigs()
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
'receive_method' => 'basic_get',
],
];
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments()
new AmqpConsumer(
$this->createContext(),
new AmqpQueue('aName'),
new Buffer()
new Buffer(),
'basic_get'
);
}

Expand Down
Loading