Skip to content

Commit 311bfdf

Browse files
committed
Added write lock
1 parent 41823bf commit 311bfdf

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

Diff for: pkg/dbal/DbalConsumerHelperTrait.php

+13-9
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,22 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive
2020

2121
$this->getConnection()->beginTransaction();
2222

23-
$message = $this->getConnection()->createQueryBuilder()
23+
$query = $this->getConnection()->createQueryBuilder()
2424
->select('*')
2525
->from($this->getContext()->getTableName())
2626
->andWhere('delivery_id IS NULL')
2727
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
2828
->andWhere('queue IN (:queues)')
2929
->addOrderBy('priority', 'desc')
3030
->addOrderBy('published_at', 'asc')
31-
->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER)
32-
->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY)
33-
->setMaxResults(1)
34-
->execute()
35-
->fetch()
36-
;
31+
->setMaxResults(1);
32+
33+
// select for update
34+
$message = $this->getConnection()->executeQuery(
35+
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
36+
['delayedUntil' => $now, 'queues' => array_values($queues)],
37+
['delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY]
38+
)->fetch();
3739

3840
if (!$message) {
3941
$this->getConnection()->commit();
@@ -50,7 +52,8 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive
5052
->setParameter('id', $message['id'], Type::GUID)
5153
->setParameter('deliveryId', $deliveryId, Type::STRING)
5254
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
53-
->execute();
55+
->execute()
56+
;
5457

5558
$this->getConnection()->commit();
5659

@@ -61,7 +64,8 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive
6164
->setParameter('deliveryId', $deliveryId, Type::STRING)
6265
->setMaxResults(1)
6366
->execute()
64-
->fetch();
67+
->fetch()
68+
;
6569

6670
return $deliveredMessage ?: null;
6771
} catch (\Exception $e) {

Diff for: pkg/dbal/Tests/Functional/DbalConsumerTest.php

+1-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public function testShouldSetPublishedAtDateToReceivedMessage()
4949

5050
$producer = $context->createProducer();
5151

52-
/** @var DbalMessage $message */
5352
$message = $context->createMessage($expectedBody);
5453
$message->setPublishedAt($time);
5554
$producer->send($queue, $message);
@@ -62,7 +61,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage()
6261
$this->assertSame($time, $message->getPublishedAt());
6362
}
6463

65-
public function testShouldOrderMessagesWithSamePriorityByPublishedAtDateee()
64+
public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate()
6665
{
6766
$context = $this->context;
6867
$queue = $context->createQueue(__METHOD__);
@@ -80,7 +79,6 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDateee()
8079

8180
$producer = $context->createProducer();
8281

83-
/** @var DbalMessage $message */
8482
$message = $context->createMessage($expectedPriority5Body);
8583
$message->setPriority(5);
8684
$message->setPublishedAt($time);

0 commit comments

Comments
 (0)