Skip to content

Commit 126e4fd

Browse files
committed
fos:elastica:populate via message queue
- Decouple processor from consumption. Sync with latest changes in dev repo. - Inject queue context from container. - reject redelivered messages.
1 parent 02cea43 commit 126e4fd

File tree

9 files changed

+318
-3
lines changed

9 files changed

+318
-3
lines changed

symfony/app/AppKernel.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ public function registerBundles()
1515
new Doctrine\Bundle\DoctrineBundle\DoctrineBundle(),
1616
new Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle(),
1717
new AppBundle\AppBundle(),
18-
new Enqueue\Bundle\EnqueueBundle()
18+
new Enqueue\Bundle\EnqueueBundle(),
19+
new FOS\ElasticaBundle\FOSElasticaBundle(),
1920
];
2021

2122
if (in_array($this->getEnvironment(), ['dev', 'test'], true)) {

symfony/app/config/config.yml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,29 @@ enqueue:
7676
router_queue: 'router'
7777
default_processor_queue: 'default'
7878

79+
fos_elastica:
80+
clients:
81+
default: { host: %elasticsearch_host%, port: %elasticsearch_port% }
82+
indexes:
83+
app:
84+
index_name: app_%kernel.environment%
85+
types:
86+
blog:
87+
mappings:
88+
text: ~
89+
persistence:
90+
driver: orm
91+
model: AppBundle\Entity\Blog
92+
provider: ~
93+
listener: ~
94+
finder: ~
7995
services:
8096
app.async.say_hello_processor:
8197
class: 'AppBundle\Async\SayHelloProcessor'
8298
tags:
83-
- { name: 'enqueue.client.message_processor' }
99+
- { name: 'enqueue.client.message_processor' }
100+
101+
app.async.elastica_populate_processor:
102+
class: 'AppBundle\Async\ElasticaPopulateProcessor'
103+
arguments:
104+
- '@fos_elastica.provider_registry'

symfony/composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
"enqueue/amqp-ext": "*",
2626
"enqueue/enqueue-bundle": "*",
2727
"enqueue/job-queue": "*",
28-
"enqueue/test": "*"
28+
"enqueue/test": "*",
29+
"friendsofsymfony/elastica-bundle": "^3",
30+
"fzaninotto/faker": "^1.6"
2931
},
3032
"require-dev": {
3133
"sensio/generator-bundle": "^3.0",

symfony/src/AppBundle/AppBundle.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace AppBundle;
44

55
use AppBundle\Async\Topics;
6+
use AppBundle\Elasticsearch\AsyncProviderCompilerPass;
67
use Enqueue\Bundle\DependencyInjection\Compiler\AddTopicMetaPass;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
89
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -12,5 +13,7 @@ class AppBundle extends Bundle
1213
public function build(ContainerBuilder $container)
1314
{
1415
$container->addCompilerPass(AddTopicMetaPass::create()->add(Topics::SAY_HELLO));
16+
17+
$container->addCompilerPass(new AsyncProviderCompilerPass());
1518
}
1619
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
namespace AppBundle\Async;
3+
4+
use Enqueue\Psr\Context;
5+
use Enqueue\Psr\Message;
6+
use Enqueue\Psr\Processor;
7+
use Enqueue\Util\JSON;
8+
use FOS\ElasticaBundle\Provider\ProviderRegistry;
9+
10+
class ElasticaPopulateProcessor implements Processor
11+
{
12+
/**
13+
* @var ProviderRegistry
14+
*/
15+
private $providerRegistry;
16+
17+
/**
18+
* @param ProviderRegistry $providerRegistry
19+
*/
20+
public function __construct(ProviderRegistry $providerRegistry)
21+
{
22+
$this->providerRegistry = $providerRegistry;
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*/
28+
public function process(Message $message, Context $context)
29+
{
30+
if ($message->isRedelivered()) {
31+
$replyMessage = $context->createMessage(false);
32+
$replyQueue = $context->createQueue($message->getReplyTo());
33+
$context->createProducer()->send($replyQueue, $replyMessage);
34+
35+
return self::REJECT;
36+
}
37+
38+
$options = JSON::decode($message->getBody());
39+
40+
$provider = $this->providerRegistry->getProvider($options['indexName'], $options['typeName']);
41+
$provider->populate(null, $options);
42+
43+
$replyMessage = $context->createMessage(true);
44+
$replyQueue = $context->createQueue($message->getReplyTo());
45+
$context->createProducer()->send($replyQueue, $replyMessage);
46+
47+
return self::ACK;
48+
}
49+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
namespace AppBundle\Command;
3+
4+
use AppBundle\Entity\Blog;
5+
use Doctrine\ORM\EntityManagerInterface;
6+
use Symfony\Component\Console\Command\Command;
7+
use Symfony\Component\Console\Input\InputInterface;
8+
use Symfony\Component\Console\Input\InputOption;
9+
use Symfony\Component\Console\Output\OutputInterface;
10+
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
11+
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
12+
13+
class GenerateBlogsCommand extends Command implements ContainerAwareInterface
14+
{
15+
use ContainerAwareTrait;
16+
17+
protected function configure()
18+
{
19+
$this
20+
->setName('app:generate-blogs')
21+
->addOption('number', null, InputOption::VALUE_REQUIRED)
22+
;
23+
}
24+
25+
protected function execute(InputInterface $input, OutputInterface $output)
26+
{
27+
$faker = \Faker\Factory::create();
28+
/** @var EntityManagerInterface $em */
29+
$em = $this->container->get('doctrine.orm.entity_manager');
30+
31+
for ($i = 1; $i <= $input->getOption('number'); $i++) {
32+
$blog = new Blog();
33+
$blog->setText($faker->paragraphs(3, true));
34+
35+
$em->persist($blog);
36+
37+
if (0 == ($i % 100)) {
38+
$em->flush();
39+
$em->clear();
40+
41+
$output->writeln('Saved 100');
42+
}
43+
44+
}
45+
46+
$em->flush();
47+
$em->clear();
48+
49+
$output->writeln('Saved the rest');
50+
}
51+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
namespace AppBundle\Elasticsearch;
3+
4+
use Enqueue\AmqpExt\AmqpContext;
5+
use Enqueue\Psr\Context;
6+
use Enqueue\Util\JSON;
7+
use FOS\ElasticaBundle\Doctrine\ORM\Provider;
8+
9+
class AsyncProvider extends Provider
10+
{
11+
private $batchSize;
12+
13+
/**
14+
* @var Context
15+
*/
16+
private $context;
17+
18+
/**
19+
* @param Context $context
20+
*/
21+
public function setContext(Context $context)
22+
{
23+
$this->context = $context;
24+
}
25+
26+
/**
27+
* {@inheritDoc}
28+
*/
29+
protected function doPopulate($options, \Closure $loggerClosure = null)
30+
{
31+
$this->batchSize = null;
32+
if ($options['real_populate']) {
33+
$this->batchSize = $options['offset'] + $options['batch_size'];
34+
35+
return parent::doPopulate($options, $loggerClosure);
36+
}
37+
38+
/** @var AmqpContext $amqpContext */
39+
$amqpContext = $this->context;
40+
41+
$queryBuilder = $this->createQueryBuilder($options['query_builder_method']);
42+
$nbObjects = $this->countObjects($queryBuilder);
43+
$offset = $options['offset'];
44+
45+
$queue = $amqpContext->createQueue('fos_elastica.populate');
46+
$queue->addFlag(AMQP_DURABLE);
47+
$amqpContext->declareQueue($queue);
48+
49+
$resultQueue = $amqpContext->createTemporaryQueue();
50+
$consumer = $amqpContext->createConsumer($resultQueue);
51+
52+
$producer = $amqpContext->createProducer();
53+
54+
$nbMessages = 0;
55+
for (; $offset < $nbObjects; $offset += $options['batch_size']) {
56+
$options['offset'] = $offset;
57+
$options['real_populate'] = true;
58+
$message = $amqpContext->createMessage(JSON::encode($options));
59+
$message->setReplyTo($resultQueue->getQueueName());
60+
$producer->send($queue, $message);
61+
62+
$nbMessages++;
63+
}
64+
65+
while ($nbMessages) {
66+
if ($message = $consumer->receive(1000)) {
67+
if (null !== $loggerClosure) {
68+
$loggerClosure($options['batch_size'], $nbObjects);
69+
}
70+
71+
$nbMessages--;
72+
}
73+
}
74+
}
75+
76+
/**
77+
* {@inheritDoc}
78+
*/
79+
protected function countObjects($queryBuilder)
80+
{
81+
return $this->batchSize ? $this->batchSize : parent::countObjects($queryBuilder);
82+
}
83+
84+
/**
85+
* {@inheritDoc}
86+
*/
87+
protected function configureOptions()
88+
{
89+
parent::configureOptions();
90+
91+
$this->resolver->setDefaults([
92+
'real_populate' => false,
93+
]);
94+
}
95+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
namespace AppBundle\Elasticsearch;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
use Symfony\Component\DependencyInjection\Reference;
7+
8+
class AsyncProviderCompilerPass implements CompilerPassInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function process(ContainerBuilder $container)
14+
{
15+
foreach ($container->getExtensionConfig('fos_elastica') as $config) {
16+
foreach ($config['indexes'] as $index => $indexData) {
17+
foreach ($indexData['types'] as $type => $typeData) {
18+
if ('orm' != $typeData['persistence']['driver']) {
19+
continue;
20+
}
21+
22+
$providerId = sprintf('fos_elastica.provider.%s.%s', $index, $type);
23+
if (false == $container->hasDefinition($providerId)) {
24+
continue;
25+
}
26+
27+
$provider = $container->getDefinition($providerId);
28+
$provider->setClass(AsyncProvider::class);
29+
$provider->addMethodCall('setContext', [new Reference('enqueue.transport.context')]);
30+
}
31+
}
32+
}
33+
}
34+
}

symfony/src/AppBundle/Entity/Blog.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
namespace AppBundle\Entity;
3+
4+
use Doctrine\ORM\Mapping as ORM;
5+
6+
/**
7+
* @ORM\Entity
8+
* @ORM\Table(name="blogs")
9+
*/
10+
class Blog
11+
{
12+
/**
13+
* @var int
14+
*
15+
* @ORM\Column(type="integer")
16+
* @ORM\Id
17+
* @ORM\GeneratedValue(strategy="AUTO")
18+
*/
19+
private $id;
20+
21+
/**
22+
* @var string
23+
*
24+
* @ORM\Column(type="text")
25+
*/
26+
private $text;
27+
28+
/**
29+
* @return string
30+
*/
31+
public function getText()
32+
{
33+
return $this->text;
34+
}
35+
36+
/**
37+
* @return int
38+
*/
39+
public function getId()
40+
{
41+
return $this->id;
42+
}
43+
44+
/**
45+
* @param int $id
46+
*/
47+
public function setId($id)
48+
{
49+
$this->id = $id;
50+
}
51+
52+
/**
53+
* @param string $text
54+
*/
55+
public function setText($text)
56+
{
57+
$this->text = $text;
58+
}
59+
}

0 commit comments

Comments
 (0)