Skip to content

[psr] Introduce MessageProcessor interface (moved from consumption). #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ Guaranty that there is only single job running with such name.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Psr\Context;
use Enqueue\Util\JSON;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;

class MessageProcessor implements MessageProcessorInterface
class ReindexProcessor implements Processor
{
/**
* @var JobRunner
Expand All @@ -43,7 +42,7 @@ class MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand All @@ -54,16 +53,15 @@ Run several sub jobs in parallel.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Util\JSON;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;

class Step1MessageProcessor implements MessageProcessorInterface
class Step1Processor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -102,11 +100,11 @@ class Step1MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}

class Step2MessageProcessor implements MessageProcessorInterface
class Step2Processor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -138,17 +136,15 @@ just after all steps are finished.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\JobQueue\DependentJobService;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Util\JSON;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;

class MessageProcessor implements MessageProcessorInterface
class ReindexProcessor implements Processor
{
/**
* @var JobRunner
Expand Down Expand Up @@ -182,7 +178,7 @@ class MessageProcessor implements MessageProcessorInterface
}
);

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand Down
15 changes: 7 additions & 8 deletions docs/bundle/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ To consume messages you have to first create a message processor:
<?php
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Processor;
use Enqueue\Client\TopicSubscriberInterface;

class FooMessageProcessor implements MessageProcessorInterface, TopicSubscriberInterface
class FooProcessor implements Processor, TopicSubscriberInterface
{
public function process(Message $message, Context $session)
{
echo $message->getBody();

return Result::ACK;
// return Result::REJECT; // when the message is broken
// return Result::REQUEUE; // the message is fine but you want to postpone processing
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}

public static function getSubscribedTopics()
Expand All @@ -72,9 +71,9 @@ Register it as a container service and subscribe to the topic:

```yaml
foo_message_processor:
class: 'FooMessageProcessor'
class: 'FooProcessor'
tags:
- { name: 'enqueue.client.message_processor' }
- { name: 'enqueue.client.processor' }
```

Now you can start consuming messages:
Expand Down
9 changes: 4 additions & 5 deletions docs/job_queue/run_sub_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ They will be executed in parallel.

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Client\MessageProducerInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;
use Enqueue\JobQueue\JobRunner;
use Enqueue\JobQueue\Job;
use Enqueue\Util\JSON;

class RootJobMessageProcessor implements MessageProcessorInterface
class RootJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand All @@ -36,11 +35,11 @@ class RootJobMessageProcessor implements MessageProcessorInterface
return true;
});

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}

class SubJobMessageProcessor implements MessageProcessorInterface
class SubJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand Down
7 changes: 3 additions & 4 deletions docs/job_queue/run_unique_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ It shows how you can run unique job using job queue (The configuration is descri

```php
<?php
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;
use Enqueue\JobQueue\JobRunner;

class MessageProcessor implements MessageProcessorInterface
class UniqueJobProcessor implements Processor
{
/** @var JobRunner */
private $jobRunner;
Expand All @@ -31,7 +30,7 @@ class MessageProcessor implements MessageProcessorInterface
return true; // if you want to ACK message or false to REJECT
});

return $result ? Result::ACK : Result::REJECT;
return $result ? self::ACK : self::REJECT;
}
}
```
Expand Down
14 changes: 7 additions & 7 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ The `consume` method starts the consumption process which last as long as it is
```php
<?php
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Result;

/** @var \Enqueue\Psr\Context $psrContext */

Expand All @@ -75,12 +75,12 @@ $queueConsumer = new QueueConsumer($psrContext);
$queueConsumer->bind('foo_queue', function(Message $message) {
// process messsage

return Result::ACK;
return Processor::ACK;
});
$queueConsumer->bind('bar_queue', function(Message $message) {
// process messsage

return Result::ACK;
return Processor::ACK;
});

$queueConsumer->consume();
Expand Down Expand Up @@ -167,16 +167,16 @@ Here's an example of how you can send and consume messages.
```php
<?php
use Enqueue\Client\SimpleClient;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;

/** @var \Enqueue\Psr\Context $psrClient */
/** @var \Enqueue\Psr\Context $psrContext */

$client = new SimpleClient($psrClient);
$client = new SimpleClient($psrContext);
$client->bind('foo_topic', function (Message $message) {
// process message

return Result::ACK;
return Processor::ACK;
});

$client->send('foo_topic', 'Hello there!');
Expand Down
14 changes: 7 additions & 7 deletions pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
namespace Enqueue\AmqpExt\Tests\Functional;

use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\Context;
use Enqueue\Psr\Message;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\MessageProcessorInterface;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Result;
use Enqueue\Psr\Context;
use Enqueue\Psr\Message;
use Enqueue\Psr\Processor;
use Enqueue\Test\RabbitmqAmqpExtension;
use Enqueue\Test\RabbitmqManagmentExtensionTrait;

Expand Down Expand Up @@ -52,7 +52,7 @@ public function testConsumeOneMessageAndExit()
new LimitConsumptionTimeExtension(new \DateTime('+3sec')),
]));

$processor = new StubMessageProcessor();
$processor = new StubProcessor();
$queueConsumer->bind($queue, $processor);

$queueConsumer->consume();
Expand Down Expand Up @@ -81,10 +81,10 @@ public function testConsumeOneMessageAndSendReplyExit()

$replyMessage = $this->amqpContext->createMessage(__METHOD__.'.reply');

$processor = new StubMessageProcessor();
$processor = new StubProcessor();
$processor->result = Result::reply($replyMessage);

$replyProcessor = new StubMessageProcessor();
$replyProcessor = new StubProcessor();

$queueConsumer->bind($queue, $processor);
$queueConsumer->bind($replyQueue, $replyProcessor);
Expand All @@ -98,7 +98,7 @@ public function testConsumeOneMessageAndSendReplyExit()
}
}

class StubMessageProcessor implements MessageProcessorInterface
class StubProcessor implements Processor
{
public $result = Result::ACK;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildClientRoutingPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';
$routerId = 'enqueue.client.router_processor';

if (false == $container->hasDefinition($routerId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

class BuildMessageProcessorRegistryPass implements CompilerPassInterface
class BuildProcessorRegistryPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorRegistryId = 'enqueue.client.message_processor_registry';
$processorTagName = 'enqueue.client.processor';
$processorRegistryId = 'enqueue.client.processor_registry';

if (false == $container->hasDefinition($processorRegistryId)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildQueueMetaRegistryPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';
$queueMetaRegistryId = 'enqueue.client.meta.queue_meta_registry';
if (false == $container->hasDefinition($queueMetaRegistryId)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class BuildTopicMetaSubscribersPass implements CompilerPassInterface
{
use ExtractMessageProcessorTagSubscriptionsTrait;
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.message_processor';
$processorTagName = 'enqueue.client.processor';

$topicsSubscribers = [];
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException;

trait ExtractMessageProcessorTagSubscriptionsTrait
trait ExtractProcessorTagSubscriptionsTrait
{
/**
* @param ContainerBuilder $container
Expand Down
8 changes: 4 additions & 4 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory;
use Enqueue\Symfony\DefaultTransportFactory;
use Enqueue\Symfony\NullTransportFactory;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildMessageProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
use Enqueue\Symfony\DefaultTransportFactory;
use Enqueue\Symfony\NullTransportFactory;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;

Expand All @@ -27,7 +27,7 @@ public function build(ContainerBuilder $container)
{
$container->addCompilerPass(new BuildExtensionsPass());
$container->addCompilerPass(new BuildClientRoutingPass());
$container->addCompilerPass(new BuildMessageProcessorRegistryPass());
$container->addCompilerPass(new BuildProcessorRegistryPass());
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
$container->addCompilerPass(new BuildQueueMetaRegistryPass());

Expand Down
Loading