-
Notifications
You must be signed in to change notification settings - Fork 440
Mongodb transport #430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mongodb transport #430
Changes from 3 commits
07ed3ba
4346e9f
83d4b85
2e8f931
7e9f62d
9cbbce8
947e00f
c17e79f
a7ea2f1
e924c23
e8de54a
f76e48d
aa3178f
f460e6c
41f2b47
a9fd835
368b608
0f8d1dc
41254f9
f011a8b
b128a65
e15e92f
3b8c997
1b23270
c407d24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
*~ | ||
/composer.lock | ||
/composer.phar | ||
/phpunit.xml | ||
/vendor/ | ||
/.idea/ | ||
/examples/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
<?php | ||
|
||
namespace Enqueue\Mongodb\Client; | ||
|
||
use Enqueue\Client\Config; | ||
use Enqueue\Client\DriverInterface; | ||
use Enqueue\Client\Message; | ||
use Enqueue\Client\MessagePriority; | ||
use Enqueue\Client\Meta\QueueMetaRegistry; | ||
use Enqueue\Mongodb\MongodbContext; | ||
use Enqueue\Mongodb\MongodbMessage; | ||
use Interop\Queue\PsrMessage; | ||
use Psr\Log\LoggerInterface; | ||
use Psr\Log\NullLogger; | ||
|
||
class MongodbDriver implements DriverInterface | ||
{ | ||
/** | ||
* @var MongodbContext | ||
*/ | ||
private $context; | ||
|
||
/** | ||
* @var Config | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* @var QueueMetaRegistry | ||
*/ | ||
private $queueMetaRegistry; | ||
|
||
/** | ||
* @var array | ||
*/ | ||
private static $priorityMap = [ | ||
MessagePriority::VERY_LOW => 0, | ||
MessagePriority::LOW => 1, | ||
MessagePriority::NORMAL => 2, | ||
MessagePriority::HIGH => 3, | ||
MessagePriority::VERY_HIGH => 4, | ||
]; | ||
|
||
/** | ||
* @param MongodbContext $context | ||
* @param Config $config | ||
* @param QueueMetaRegistry $queueMetaRegistry | ||
*/ | ||
public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) | ||
{ | ||
$this->context = $context; | ||
$this->config = $config; | ||
$this->queueMetaRegistry = $queueMetaRegistry; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
* | ||
* @return MongodbMessage | ||
*/ | ||
public function createTransportMessage(Message $message) | ||
{ | ||
$properties = $message->getProperties(); | ||
|
||
$headers = $message->getHeaders(); | ||
$headers['content_type'] = $message->getContentType(); | ||
|
||
$transportMessage = $this->context->createMessage(); | ||
$transportMessage->setBody($message->getBody()); | ||
$transportMessage->setHeaders($headers); | ||
$transportMessage->setProperties($properties); | ||
$transportMessage->setMessageId($message->getMessageId()); | ||
$transportMessage->setTimestamp($message->getTimestamp()); | ||
$transportMessage->setDeliveryDelay($message->getDelay()); | ||
$transportMessage->setReplyTo($message->getReplyTo()); | ||
$transportMessage->setCorrelationId($message->getCorrelationId()); | ||
if (array_key_exists($message->getPriority(), self::$priorityMap)) { | ||
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]); | ||
} | ||
|
||
return $transportMessage; | ||
} | ||
|
||
/** | ||
* @param MongodbMessage $message | ||
* | ||
* {@inheritdoc} | ||
*/ | ||
public function createClientMessage(PsrMessage $message) | ||
{ | ||
$clientMessage = new Message(); | ||
|
||
$clientMessage->setBody($message->getBody()); | ||
$clientMessage->setHeaders($message->getHeaders()); | ||
$clientMessage->setProperties($message->getProperties()); | ||
|
||
$clientMessage->setContentType($message->getHeader('content_type')); | ||
$clientMessage->setMessageId($message->getMessageId()); | ||
$clientMessage->setTimestamp($message->getTimestamp()); | ||
$clientMessage->setDelay($message->getDeliveryDelay()); | ||
$clientMessage->setReplyTo($message->getReplyTo()); | ||
$clientMessage->setCorrelationId($message->getCorrelationId()); | ||
|
||
$priorityMap = array_flip(self::$priorityMap); | ||
$priority = array_key_exists($message->getPriority(), $priorityMap) ? | ||
$priorityMap[$message->getPriority()] : | ||
MessagePriority::NORMAL; | ||
$clientMessage->setPriority($priority); | ||
|
||
return $clientMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToRouter(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { | ||
throw new \LogicException('Topic name parameter is required but is not set'); | ||
} | ||
|
||
$queue = $this->createQueue($this->config->getRouterQueueName()); | ||
$transportMessage = $this->createTransportMessage($message); | ||
|
||
$this->context->createProducer()->send($queue, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToProcessor(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { | ||
throw new \LogicException('Processor name parameter is required but is not set'); | ||
} | ||
|
||
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { | ||
throw new \LogicException('Queue name parameter is required but is not set'); | ||
} | ||
|
||
$transportMessage = $this->createTransportMessage($message); | ||
$destination = $this->createQueue($queueName); | ||
|
||
$this->context->createProducer()->send($destination, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createQueue($queueName) | ||
{ | ||
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); | ||
|
||
return $this->context->createQueue($transportName); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function setupBroker(LoggerInterface $logger = null) | ||
{ | ||
$logger = $logger ?: new NullLogger(); | ||
$log = function ($text, ...$args) use ($logger) { | ||
$logger->debug(sprintf('[MongodbDriver] '.$text, ...$args)); | ||
}; | ||
$contextConfig = $this->context->getConfig(); | ||
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']); | ||
$this->context->createCollection(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getConfig() | ||
{ | ||
return $this->config; | ||
} | ||
|
||
/** | ||
* @return array | ||
*/ | ||
public static function getPriorityMap() | ||
{ | ||
return self::$priorityMap; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
<?php | ||
|
||
namespace Enqueue\Mongodb; | ||
|
||
use Interop\Queue\PsrConnectionFactory; | ||
use MongoDB\Client; | ||
|
||
class MongodbConnectionFactory implements PsrConnectionFactory | ||
{ | ||
/** | ||
* @var array | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Mongodb localhost with default credentials. | ||
* | ||
* $config = [ | ||
* 'uri' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/ | ||
* 'dbname' => 'enqueue', - database name. | ||
* 'collection_name' => 'enqueue' - collection name | ||
* 'polling_interval' => '1000', - How often query for new messages (milliseconds) | ||
* ] | ||
* | ||
* or | ||
* | ||
* mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue | ||
* | ||
* @param array|string|null $config | ||
*/ | ||
public function __construct($config = 'mongodb:') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you please add a docblcok with the information on possible configuration options |
||
{ | ||
if (empty($config)) { | ||
$config = $this->parseDsn('mongodb:'); | ||
} elseif (is_string($config)) { | ||
$config = $this->parseDsn($config); | ||
} elseif (is_array($config)) { | ||
} else { | ||
throw new \LogicException('The config must be either an array of options, a DSN string or null'); | ||
} | ||
$config = array_replace([ | ||
'uri' => 'mongodb://127.0.0.1/', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should add dbname and a collection here as well |
||
], $config); | ||
|
||
$this->config = $config; | ||
} | ||
|
||
public function createContext() | ||
{ | ||
$client = new Client($this->config['uri']); | ||
|
||
return new MongodbContext($client, $this->config); | ||
} | ||
|
||
public static function parseDsn($dsn) | ||
{ | ||
$parsedUrl = parse_url($dsn); | ||
if (false === $parsedUrl) { | ||
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); | ||
} | ||
if (empty($parsedUrl['scheme'])) { | ||
throw new \LogicException('Schema is empty'); | ||
} | ||
$supported = [ | ||
'mongodb' => true, | ||
]; | ||
if (false == isset($parsedUrl['scheme'])) { | ||
throw new \LogicException(sprintf( | ||
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".', | ||
$parsedUrl['scheme'], | ||
implode('", "', array_keys($supported)) | ||
)); | ||
} | ||
if ('mongodb:' === $dsn) { | ||
return [ | ||
'uri' => 'mongodb://127.0.0.1/', | ||
]; | ||
} | ||
$config['uri'] = $dsn; | ||
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) { | ||
$pathParts = explode('/', $parsedUrl['path']); | ||
//DB name | ||
if ($pathParts[1]) { | ||
$config['dbname'] = $pathParts[1]; | ||
} | ||
} | ||
if (isset($parsedUrl['query'])) { | ||
$queryParts = null; | ||
parse_str($parsedUrl['query'], $queryParts); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use of undefined var, define |
||
//get enqueue attributes values | ||
if (!empty($queryParts['polling_interval'])) { | ||
$config['polling_interval'] = $queryParts['polling_interval']; | ||
} | ||
if (!empty($queryParts['enqueue_collection'])) { | ||
$config['collection_name'] = $queryParts['enqueue_collection']; | ||
} | ||
} | ||
|
||
return $config; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an empty line after the service definition.