-
Notifications
You must be signed in to change notification settings - Fork 440
DBAL Transport #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBAL Transport #54
Changes from 2 commits
192019a
674edd4
c840b68
d1b3c13
b836dc8
334820c
eb8bf6a
ec51705
783a956
9bb6e13
fd72fdb
75b8bbe
abcf56c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
*~ | ||
/composer.lock | ||
/composer.phar | ||
/phpunit.xml | ||
/vendor/ | ||
/.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
sudo: false | ||
|
||
git: | ||
depth: 1 | ||
|
||
language: php | ||
|
||
php: | ||
- '5.6' | ||
- '7.0' | ||
|
||
cache: | ||
directories: | ||
- $HOME/.composer/cache | ||
|
||
install: | ||
- composer self-update | ||
- composer install --prefer-source | ||
|
||
script: | ||
- vendor/bin/phpunit --exclude-group=functional |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
<?php | ||
namespace Enqueue\Dbal\Client; | ||
|
||
use Enqueue\Client\Config; | ||
use Enqueue\Client\DriverInterface; | ||
use Enqueue\Client\Message; | ||
use Enqueue\Client\MessagePriority; | ||
use Enqueue\Client\Meta\QueueMetaRegistry; | ||
use Enqueue\Dbal\DbalContext; | ||
use Enqueue\Dbal\DbalDestination; | ||
use Enqueue\Dbal\DbalMessage; | ||
use Enqueue\Psr\PsrMessage; | ||
use Psr\Log\LoggerInterface; | ||
|
||
class DbalDriver implements DriverInterface | ||
{ | ||
/** | ||
* @var DbalContext | ||
*/ | ||
private $context; | ||
|
||
/** | ||
* @var Config | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* @var QueueMetaRegistry | ||
*/ | ||
private $queueMetaRegistry; | ||
|
||
/** | ||
* @param DbalContext $context | ||
* @param Config $config | ||
* @param QueueMetaRegistry $queueMetaRegistry | ||
*/ | ||
public function __construct(DbalContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) | ||
{ | ||
$this->context = $context; | ||
$this->config = $config; | ||
$this->queueMetaRegistry = $queueMetaRegistry; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
* | ||
* @return DbalMessage | ||
*/ | ||
public function createTransportMessage(Message $message) | ||
{ | ||
$properties = $message->getProperties(); | ||
|
||
$headers = $message->getHeaders(); | ||
$headers['content_type'] = $message->getContentType(); | ||
|
||
$transportMessage = $this->context->createMessage(); | ||
$transportMessage->setBody($message->getBody()); | ||
$transportMessage->setHeaders($headers); | ||
$transportMessage->setProperties($properties); | ||
$transportMessage->setMessageId($message->getMessageId()); | ||
$transportMessage->setTimestamp($message->getTimestamp()); | ||
$transportMessage->setDelay($message->getDelay()); | ||
|
||
return $transportMessage; | ||
} | ||
|
||
/** | ||
* @param DbalMessage $message | ||
* | ||
* {@inheritdoc} | ||
*/ | ||
public function createClientMessage(PsrMessage $message) | ||
{ | ||
$clientMessage = new Message(); | ||
|
||
$clientMessage->setBody($message->getBody()); | ||
$clientMessage->setHeaders($message->getHeaders()); | ||
$clientMessage->setProperties($message->getProperties()); | ||
|
||
$clientMessage->setContentType($message->getHeader('content_type')); | ||
$clientMessage->setMessageId($message->getMessageId()); | ||
$clientMessage->setTimestamp($message->getTimestamp()); | ||
$clientMessage->setPriority(MessagePriority::NORMAL); | ||
$clientMessage->setDelay($message->getDelay()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. setReplyTo |
||
|
||
return $clientMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToRouter(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { | ||
throw new \LogicException('Topic name parameter is required but is not set'); | ||
} | ||
|
||
$topic = $this->createRouterTopic(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to use $queue = $this->createQueue($this->config->getRouterQueueName()); |
||
$transportMessage = $this->createTransportMessage($message); | ||
|
||
$this->context->createProducer()->send($topic, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToProcessor(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { | ||
throw new \LogicException('Processor name parameter is required but is not set'); | ||
} | ||
|
||
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { | ||
throw new \LogicException('Queue name parameter is required but is not set'); | ||
} | ||
|
||
$transportMessage = $this->createTransportMessage($message); | ||
$destination = $this->createQueue($queueName); | ||
|
||
$this->context->createProducer()->send($destination, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createQueue($queueName) | ||
{ | ||
return $this->context->createQueue($this->config->createTransportQueueName($queueName)); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function setupBroker(LoggerInterface $logger = null) | ||
{ | ||
// TODO: Implement setupBroker() method. | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getConfig() | ||
{ | ||
return $this->config; | ||
} | ||
|
||
/** | ||
* @return DbalDestination | ||
*/ | ||
private function createRouterTopic() | ||
{ | ||
return $this->context->createTopic( | ||
$this->config->createTransportQueueName($this->config->getRouterTopicName()) | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
<?php | ||
namespace Enqueue\Dbal; | ||
|
||
use Doctrine\Common\Persistence\ManagerRegistry; | ||
use Doctrine\DBAL\Connection; | ||
use Enqueue\Psr\PsrConnectionFactory; | ||
|
||
class DbalConnectionFactory implements PsrConnectionFactory | ||
{ | ||
/** | ||
* @var ManagerRegistry | ||
*/ | ||
private $registry; | ||
|
||
/** | ||
* @var array | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* $config = [ | ||
* 'connectionName' => Dbal connection name. | ||
* 'tableName' => Database table name. | ||
* 'pollingInterval' => msec How often query for new messages | ||
* 'lazy' => bool Use lazy database connection | ||
* ]. | ||
* | ||
* @param $config | ||
*/ | ||
public function __construct(ManagerRegistry $registry, array $config = []) | ||
{ | ||
$this->config = array_replace([ | ||
'connectionName' => null, | ||
'lazy' => true, | ||
], $config); | ||
|
||
$this->registry = $registry; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
* | ||
* @return DbalContext | ||
*/ | ||
public function createContext() | ||
{ | ||
if ($this->config['lazy']) { | ||
return new DbalContext(function () { | ||
return $this->establishConnection(); | ||
}, $this->config); | ||
} | ||
|
||
return new DbalContext($this->establishConnection(), $this->config); | ||
} | ||
|
||
/** | ||
* @return Connection | ||
*/ | ||
private function establishConnection() | ||
{ | ||
return $this->registry->getConnection($this->config['connectionName']); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we should call connect at this stage |
||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function close() | ||
{ | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replyTo,correlationId