Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: php-enqueue/enqueue-dev
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 0.4.16
Choose a base ref
...
head repository: php-enqueue/enqueue-dev
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0.4.17
Choose a head ref
  • 7 commits
  • 6 files changed
  • 1 contributor

Commits on Jun 19, 2017

  1. [amqp] Fixes high CPU consumption when basic get is used

    - usleep 100ms was added
    makasim committed Jun 19, 2017
    Copy the full SHA
    522d257 View commit details
  2. Merge pull request #117 from php-enqueue/amqp-add-sleep-to-basic-get

    [amqp] Fixes high CPU consumption when basic get is used
    makasim authored Jun 19, 2017
    Copy the full SHA
    e4aee07 View commit details
  3. Copy the full SHA
    91c738a View commit details
  4. Copy the full SHA
    7430841 View commit details
  5. Merge pull request #118 from php-enqueue/bundle-add-amqp-receive-meth…

    …od-option
    
    [amqp] Add 'receive_method' to amqp transport factory.
    makasim authored Jun 19, 2017
    Copy the full SHA
    51a36c0 View commit details
  6. Merge pull request #119 from php-enqueue/simple-client-allow-processo…

    …rs-bind
    
    [simple-client] Allow processor instance bind.
    makasim authored Jun 19, 2017
    Copy the full SHA
    4fb33e3 View commit details
  7. Release 0.4.17

    makasim committed Jun 19, 2017

    Verified

    This commit was signed with the committer’s verified signature.
    makasim Max Kotliar
    Copy the full SHA
    855d974 View commit details
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Change Log

## [0.4.17](https://github.com/php-enqueue/enqueue-dev/tree/0.4.17) (2017-06-19)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.16...0.4.17)

- \[amqp\] Fixes high CPU consumption when basic get is used [\#117](https://github.com/php-enqueue/enqueue-dev/pull/117) ([makasim](https://github.com/makasim))

- \[RabbitMQ\] High resource usage in AmqpConsumer::receiveBasicGet\(\) [\#116](https://github.com/php-enqueue/enqueue-dev/issues/116)

- \[simple-client\] Allow processor instance bind. [\#119](https://github.com/php-enqueue/enqueue-dev/pull/119) ([makasim](https://github.com/makasim))
- \[amqp\] Add 'receive\_method' to amqp transport factory. [\#118](https://github.com/php-enqueue/enqueue-dev/pull/118) ([makasim](https://github.com/makasim))

## [0.4.16](https://github.com/php-enqueue/enqueue-dev/tree/0.4.16) (2017-06-16)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.15...0.4.16)

- ProducerV2 For SimpleClient [\#115](https://github.com/php-enqueue/enqueue-dev/pull/115) ([ASKozienko](https://github.com/ASKozienko))

## [0.4.15](https://github.com/php-enqueue/enqueue-dev/tree/0.4.15) (2017-06-14)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.14...0.4.15)

2 changes: 2 additions & 0 deletions pkg/amqp-ext/AmqpConsumer.php
Original file line number Diff line number Diff line change
@@ -134,6 +134,8 @@ private function receiveBasicGet($timeout)
if ($message = $this->receiveNoWait()) {
return $message;
}

usleep(100000); //100ms
}
}

5 changes: 5 additions & 0 deletions pkg/amqp-ext/Symfony/AmqpTransportFactory.php
Original file line number Diff line number Diff line change
@@ -85,6 +85,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
->booleanNode('lazy')
->defaultTrue()
->end()
->enumNode('receive_method')
->values(['basic_get', 'basic_consume'])
->defaultValue('basic_get')
->info('The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher')
->end()
;
}

43 changes: 43 additions & 0 deletions pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\Config\Definition\Processor;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
@@ -54,6 +55,7 @@ public function testShouldAllowAddConfiguration()
'vhost' => '/',
'persisted' => false,
'lazy' => true,
'receive_method' => 'basic_get',
], $config);
}

@@ -76,6 +78,47 @@ public function testShouldAllowAddConfigurationAsString()
'vhost' => '/',
'persisted' => false,
'lazy' => true,
'receive_method' => 'basic_get',
], $config);
}

public function testThrowIfInvalidReceiveMethodIsSet()
{
$transport = new AmqpTransportFactory();
$tb = new TreeBuilder();
$rootNode = $tb->root('foo');

$transport->addConfiguration($rootNode);
$processor = new Processor();

$this->expectException(InvalidConfigurationException::class);
$this->expectExceptionMessage('The value "anInvalidMethod" is not allowed for path "foo.receive_method". Permissible values: "basic_get", "basic_consume"');
$processor->process($tb->buildTree(), [[
'receive_method' => 'anInvalidMethod',
]]);
}

public function testShouldAllowChangeReceiveMethod()
{
$transport = new AmqpTransportFactory();
$tb = new TreeBuilder();
$rootNode = $tb->root('foo');

$transport->addConfiguration($rootNode);
$processor = new Processor();
$config = $processor->process($tb->buildTree(), [[
'receive_method' => 'basic_consume',
]]);

$this->assertEquals([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'lazy' => true,
'receive_method' => 'basic_consume',
], $config);
}

Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ public function testShouldAllowAddConfiguration()
'persisted' => false,
'delay_plugin_installed' => false,
'lazy' => true,
'receive_method' => 'basic_get',
], $config);
}

19 changes: 14 additions & 5 deletions pkg/simple-client/SimpleClient.php
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\Symfony\SqsTransportFactory;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
@@ -80,17 +81,25 @@ public function __construct($config)
}

/**
* @param string $topic
* @param string $processorName
* @param callback $processor
* @param string $topic
* @param string $processorName
* @param callable|PsrProcessor $processor
*/
public function bind($topic, $processorName, callable $processor)
public function bind($topic, $processorName, $processor)
{
if (is_callable($processor)) {
$processor = new CallbackProcessor($processor);
}

if (false == $processor instanceof PsrProcessor) {
throw new \LogicException('The processor must be either callable or instance of PsrProcessor');
}

$queueName = $this->getConfig()->getDefaultProcessorQueueName();

$this->getTopicMetaRegistry()->addProcessor($topic, $processorName);
$this->getQueueMetaRegistry()->addProcessor($queueName, $processorName);
$this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor));
$this->getProcessorRegistry()->add($processorName, $processor);
$this->getRouterProcessor()->add($topic, $queueName, $processorName);
}