Skip to content

Added: ability to choose different entity manager #1081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 24, 2020
1 change: 1 addition & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enqueue:
queue_name: ~
job:
enabled: false
default_mapping: true
async_events:
enabled: false
extensions:
Expand Down
4 changes: 4 additions & 0 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ enqueue:
# plus basic bundle configuration

job: true

# adds bundle's default Job entity mapping to application's entity manager.
# set it to false when using your own mapped entities for jobs.
default_mapping: true

doctrine:
# plus basic bundle configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ class DoctrineClearIdentityMapExtension implements MessageReceivedExtensionInter
*/
protected $registry;

/**
* @param ManagerRegistry $registry
*/
public function __construct(ManagerRegistry $registry)
{
$this->registry = $registry;
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ private function getJobConfiguration(): ArrayNodeDefinition
}

return (new ArrayNodeDefinition('job'))
->children()
->booleanNode('default_mapping')
->defaultTrue()
->info('Adds bundle\'s default Job entity mapping to application\'s entity manager')
->end()
->end()
->addDefaultsIfNotSet()
->canBeEnabled()
;
Expand Down
8 changes: 8 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain
return;
}

foreach ($container->getExtensionConfig('enqueue') as $modules) {
foreach ($modules as $config) {
if (isset($config['job']) && false === $config['job']['default_mapping']) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does not look accurate. For example one config defines false and the next defines true. The merged value might be true but this logic would not load default entites.

There must be a method that allows getting merged version of the config. But I am not sure. Could you please look for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for taking so long.
I was not able to find a solution for this yet, but this part of code is happening before the merging of configuration, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should merge it manually and use the result in condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symfony provides tools for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe

Copy link
Contributor

@Steveb-p Steveb-p Sep 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://symfony.com/doc/current/bundles/configuration.html#processing-the-configs-array

EDIT: However I'm surprised that the configuration isn't already resolved at this point. Processing it more than once should not be an issue, since in production it would be cached if I understand correctly, but still...

return;
}
}
}

foreach ($container->getExtensionConfig('doctrine') as $config) {
// do not register mappings if dbal not configured.
if (!empty($config['dbal'])) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/enqueue-bundle/Tests/Functional/App/AppKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public function getLogDir()
return sys_get_temp_dir().'/EnqueueBundle/cache/logs';
}

/**
* @param \Symfony\Component\Config\Loader\LoaderInterface $loader
*/
public function registerContainerConfiguration(LoaderInterface $loader)
{
$loader->load(__DIR__.'/config/config.yml');
Expand Down
11 changes: 1 addition & 10 deletions pkg/job-queue/CalculateRootJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@ class CalculateRootJobStatusService
*/
private $jobStorage;

/**
* @param JobStorage $jobStorage
*/
public function __construct(JobStorage $jobStorage)
{
$this->jobStorage = $jobStorage;
}

/**
* @param Job $job
*
* @return bool true if root job was stopped
*/
public function calculate(Job $job)
Expand Down Expand Up @@ -91,11 +86,7 @@ protected function calculateRootJobStatus(array $jobs)
$success++;
break;
default:
throw new \LogicException(sprintf(
'Got unsupported job status: id: "%s" status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Got unsupported job status: id: "%s" status: "%s"', $job->getId(), $job->getStatus()));
}
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/job-queue/DependentJobContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ class DependentJobContext
*/
private $dependentJobs;

/**
* @param Job $job
*/
public function __construct(Job $job)
{
$this->job = $job;
Expand Down
5 changes: 0 additions & 5 deletions pkg/job-queue/DependentJobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ class DependentJobProcessor implements Processor, TopicSubscriberInterface
*/
private $logger;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
* @param LoggerInterface $logger
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer, LoggerInterface $logger)
{
$this->jobStorage = $jobStorage;
Expand Down
10 changes: 1 addition & 9 deletions pkg/job-queue/DependentJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,17 @@ public function __construct(JobStorage $jobStorage)
}

/**
* @param Job $job
*
* @return DependentJobContext
*/
public function createDependentJobContext(Job $job)
{
return new DependentJobContext($job);
}

/**
* @param DependentJobContext $context
*/
public function saveDependentJob(DependentJobContext $context)
{
if (!$context->getJob()->isRoot()) {
throw new \LogicException(sprintf(
'Only root jobs allowed but got child. jobId: "%s"',
$context->getJob()->getId()
));
throw new \LogicException(sprintf('Only root jobs allowed but got child. jobId: "%s"', $context->getJob()->getId()));
}

$this->jobStorage->saveJob($context->getJob(), function (Job $job) use ($context) {
Expand Down
21 changes: 4 additions & 17 deletions pkg/job-queue/Doctrine/JobStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class JobStorage
private $uniqueTableName;

/**
* @param ManagerRegistry $doctrine
* @param string $entityClass
* @param string $uniqueTableName
* @param string $entityClass
* @param string $uniqueTableName
*/
public function __construct(ManagerRegistry $doctrine, $entityClass, $uniqueTableName)
{
Expand Down Expand Up @@ -90,7 +89,6 @@ public function findRootJobByOwnerIdAndJobName($ownerId, $jobName)

/**
* @param string $name
* @param Job $rootJob
*
* @return Job
*/
Expand Down Expand Up @@ -119,20 +117,13 @@ public function createJob()
}

/**
* @param Job $job
* @param \Closure|null $lockCallback
*
* @throws DuplicateJobException
*/
public function saveJob(Job $job, \Closure $lockCallback = null)
{
$class = $this->getEntityRepository()->getClassName();
if (!$job instanceof $class) {
throw new \LogicException(sprintf(
'Got unexpected job instance: expected: "%s", actual" "%s"',
$class,
get_class($job)
));
throw new \LogicException(sprintf('Got unexpected job instance: expected: "%s", actual" "%s"', $class, get_class($job)));
}

if ($lockCallback) {
Expand Down Expand Up @@ -175,11 +166,7 @@ public function saveJob(Job $job, \Closure $lockCallback = null)
]);
}
} catch (UniqueConstraintViolationException $e) {
throw new DuplicateJobException(sprintf(
'Duplicate job. ownerId:"%s", name:"%s"',
$job->getOwnerId(),
$job->getName()
));
throw new DuplicateJobException(sprintf('Duplicate job. ownerId:"%s", name:"%s"', $job->getOwnerId(), $job->getName()));
}

$this->getEntityManager()->persist($job);
Expand Down
9 changes: 0 additions & 9 deletions pkg/job-queue/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ public function getCreatedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $createdAt
*/
public function setCreatedAt(\DateTime $createdAt)
{
Expand All @@ -258,8 +256,6 @@ public function getStartedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $startedAt
*/
public function setStartedAt(\DateTime $startedAt)
{
Expand All @@ -279,8 +275,6 @@ public function getStoppedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $stoppedAt
*/
public function setStoppedAt(\DateTime $stoppedAt)
{
Expand Down Expand Up @@ -324,9 +318,6 @@ public function getData()
return $this->data;
}

/**
* @param array $data
*/
public function setData(array $data)
{
$this->data = $data;
Expand Down
46 changes: 4 additions & 42 deletions pkg/job-queue/JobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class JobProcessor
*/
private $producer;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer)
{
$this->jobStorage = $jobStorage;
Expand Down Expand Up @@ -74,7 +70,6 @@ public function findOrCreateRootJob($ownerId, $jobName, $unique = false)

/**
* @param string $jobName
* @param Job $rootJob
*
* @return Job
*/
Expand Down Expand Up @@ -104,9 +99,6 @@ public function findOrCreateChildJob($jobName, Job $rootJob)
return $job;
}

/**
* @param Job $job
*/
public function startChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -116,11 +108,7 @@ public function startChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_NEW !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can start only new jobs: id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can start only new jobs: id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_RUNNING);
Expand All @@ -131,9 +119,6 @@ public function startChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function successChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -143,11 +128,7 @@ public function successChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can success only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can success only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_SUCCESS);
Expand All @@ -158,9 +139,6 @@ public function successChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function failChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -170,11 +148,7 @@ public function failChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can fail only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can fail only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_FAILED);
Expand All @@ -185,9 +159,6 @@ public function failChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function cancelChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -197,11 +168,7 @@ public function cancelChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (!in_array($job->getStatus(), [Job::STATUS_NEW, Job::STATUS_RUNNING], true)) {
throw new \LogicException(sprintf(
'Can cancel only new or running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can cancel only new or running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_CANCELLED);
Expand All @@ -217,7 +184,6 @@ public function cancelChildJob(Job $job)
}

/**
* @param Job $job
* @param bool $force
*/
public function interruptRootJob(Job $job, $force = false)
Expand Down Expand Up @@ -245,8 +211,6 @@ public function interruptRootJob(Job $job, $force = false)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function saveJob(Job $job)
{
Expand All @@ -255,8 +219,6 @@ protected function saveJob(Job $job)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function sendCalculateRootJobStatusEvent(Job $job)
{
Expand Down
Loading