Skip to content

Commit 41823bf

Browse files
committed
United methods in trait. Covered by transaction
1 parent 697d951 commit 41823bf

File tree

5 files changed

+125
-85
lines changed

5 files changed

+125
-85
lines changed

Diff for: pkg/dbal/DbalConsumer.php

+9-27
Original file line numberDiff line numberDiff line change
@@ -87,37 +87,19 @@ public function getQueue(): Queue
8787

8888
public function receiveNoWait(): ?Message
8989
{
90-
$this->redeliverMessages();
91-
92-
$this->getConnection()->beginTransaction();
93-
try {
94-
$deliveryId = (string) Uuid::uuid1();
95-
96-
// get top message from the queue
97-
$message = $this->fetchMessage([$this->queue->getQueueName()]);
98-
99-
if (null == $message) {
100-
$this->getConnection()->commit();
101-
102-
return null;
103-
}
90+
$deliveryId = (string) Uuid::uuid1();
91+
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
10492

105-
$this->markMessageAsDeliveredToConsumer($message, $deliveryId);
106-
107-
$dbalMessage = $this->getMessageByDeliveryId($deliveryId);
108-
109-
$this->getConnection()->commit();
93+
$this->redeliverMessages();
11094

111-
if ($dbalMessage['redelivered'] || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) {
112-
return $this->getContext()->convertMessage($dbalMessage);
95+
// get top message from the queue
96+
if ($message = $this->fetchMessage([$this->queue->getQueueName()], $deliveryId, $redeliveryDelay)) {
97+
if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) {
98+
return $this->getContext()->convertMessage($message);
11399
}
114-
115-
return null;
116-
} catch (\Exception $e) {
117-
$this->dbal->rollBack();
118-
119-
throw $e;
120100
}
101+
102+
return null;
121103
}
122104

123105
/**

Diff for: pkg/dbal/DbalConsumerHelperTrait.php

+50-44
Original file line numberDiff line numberDiff line change
@@ -13,55 +13,61 @@ abstract public function getContext(): DbalContext;
1313

1414
abstract public function getConnection(): Connection;
1515

16-
protected function fetchMessage(array $queues): ?array
16+
protected function fetchMessage(array $queues, string $deliveryId, int $redeliveryDelay): ?array
1717
{
18-
$now = time();
18+
try {
19+
$now = time();
1920

20-
$result = $this->getConnection()->createQueryBuilder()
21-
->select('*')
22-
->from($this->getContext()->getTableName())
23-
->andWhere('delivery_id IS NULL')
24-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
25-
->andWhere('queue IN (:queues)')
26-
->addOrderBy('priority', 'desc')
27-
->addOrderBy('published_at', 'asc')
28-
->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER)
29-
->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY)
30-
->setMaxResults(1)
31-
->execute()
32-
->fetch()
33-
;
21+
$this->getConnection()->beginTransaction();
3422

35-
return $result ?: null;
36-
}
23+
$message = $this->getConnection()->createQueryBuilder()
24+
->select('*')
25+
->from($this->getContext()->getTableName())
26+
->andWhere('delivery_id IS NULL')
27+
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
28+
->andWhere('queue IN (:queues)')
29+
->addOrderBy('priority', 'desc')
30+
->addOrderBy('published_at', 'asc')
31+
->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER)
32+
->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY)
33+
->setMaxResults(1)
34+
->execute()
35+
->fetch()
36+
;
3737

38-
protected function markMessageAsDeliveredToConsumer(array $message, string $deliveryId): void
39-
{
40-
$now = time();
41-
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
38+
if (!$message) {
39+
$this->getConnection()->commit();
4240

43-
$this->getConnection()->createQueryBuilder()
44-
->andWhere('id = :id')
45-
->update($this->getContext()->getTableName())
46-
->set('delivery_id', ':deliveryId')
47-
->set('redeliver_after', ':redeliverAfter')
48-
->setParameter('id', $message['id'], Type::GUID)
49-
->setParameter('deliveryId', $deliveryId, Type::STRING)
50-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
51-
->execute()
52-
;
53-
}
41+
return null;
42+
}
5443

55-
protected function getMessageByDeliveryId(string $deliveryId): array
56-
{
57-
return $this->getConnection()->createQueryBuilder()
58-
->select('*')
59-
->from($this->getContext()->getTableName())
60-
->andWhere('delivery_id = :deliveryId')
61-
->setParameter('deliveryId', $deliveryId, Type::STRING)
62-
->setMaxResults(1)
63-
->execute()
64-
->fetch()
65-
;
44+
// mark message as delivered to consumer
45+
$this->getConnection()->createQueryBuilder()
46+
->andWhere('id = :id')
47+
->update($this->getContext()->getTableName())
48+
->set('delivery_id', ':deliveryId')
49+
->set('redeliver_after', ':redeliverAfter')
50+
->setParameter('id', $message['id'], Type::GUID)
51+
->setParameter('deliveryId', $deliveryId, Type::STRING)
52+
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
53+
->execute();
54+
55+
$this->getConnection()->commit();
56+
57+
$deliveredMessage = $this->getConnection()->createQueryBuilder()
58+
->select('*')
59+
->from($this->getContext()->getTableName())
60+
->andWhere('delivery_id = :deliveryId')
61+
->setParameter('deliveryId', $deliveryId, Type::STRING)
62+
->setMaxResults(1)
63+
->execute()
64+
->fetch();
65+
66+
return $deliveredMessage ?: null;
67+
} catch (\Exception $e) {
68+
$this->getConnection()->rollBack();
69+
70+
throw $e;
71+
}
6672
}
6773
}

Diff for: pkg/dbal/DbalContext.php

-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,6 @@ public function createDataBaseTable(): void
240240
$table->addIndex(['priority', 'published_at']);
241241
$table->addIndex(['redeliver_after']);
242242
$table->addUniqueIndex(['delivery_id']);
243-
$table->addIndex(['delivery_id']);
244243

245244
$sm->createTable($table);
246245
}

Diff for: pkg/dbal/DbalSubscriptionConsumer.php

+2-7
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public function consume(int $timeout = 0): void
8181
$now = time();
8282
$timeout /= 1000;
8383
$deliveryId = (string) Uuid::uuid1();
84+
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
8485

8586
$queueNames = [];
8687
foreach (array_keys($this->subscribers) as $queueName) {
@@ -93,13 +94,7 @@ public function consume(int $timeout = 0): void
9394
$currentQueueNames = $queueNames;
9495
}
9596

96-
$message = $this->fetchMessage($currentQueueNames);
97-
98-
if ($message) {
99-
$this->markMessageAsDeliveredToConsumer($message, $deliveryId);
100-
101-
$message = $this->getMessageByDeliveryId($deliveryId);
102-
97+
if ($message = $this->fetchMessage($currentQueueNames, $deliveryId, $redeliveryDelay)) {
10398
$dbalMessage = $this->getContext()->convertMessage($message);
10499

105100
/**

Diff for: pkg/dbal/Tests/DbalConsumerTest.php

+64-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace Enqueue\Dbal\Tests;
66

7+
use Doctrine\DBAL\Connection;
8+
use Doctrine\DBAL\Types\Type;
79
use Enqueue\Dbal\DbalConsumer;
810
use Enqueue\Dbal\DbalContext;
911
use Enqueue\Dbal\DbalDestination;
@@ -38,10 +40,40 @@ public function testShouldReturnInstanceOfDestination()
3840
$this->assertSame($destination, $consumer->getQueue());
3941
}
4042

41-
public function testCouldCallAcknowledgedMethod()
43+
public function testShouldDeleteMessageOnAcknowledge()
4244
{
43-
$consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue'));
44-
$consumer->acknowledge(new DbalMessage());
45+
$queue = new DbalDestination('queue');
46+
47+
$message = new DbalMessage();
48+
$message->setBody('theBody');
49+
$message->setDeliveryId('foo-delivery-id');
50+
51+
$dbal = $this->createConectionMock();
52+
$dbal
53+
->expects($this->once())
54+
->method('delete')
55+
->with(
56+
'some-table-name',
57+
['delivery_id' => $message->getDeliveryId()],
58+
['delivery_id' => Type::STRING]
59+
)
60+
;
61+
62+
$context = $this->createContextMock();
63+
$context
64+
->expects($this->once())
65+
->method('getDbalConnection')
66+
->will($this->returnValue($dbal))
67+
;
68+
$context
69+
->expects($this->once())
70+
->method('getTableName')
71+
->will($this->returnValue('some-table-name'))
72+
;
73+
74+
$consumer = new DbalConsumer($context, $queue);
75+
76+
$consumer->acknowledge($message);
4577
}
4678

4779
public function testCouldSetAndGetPollingInterval()
@@ -77,17 +109,35 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()
77109
$consumer->reject(new InvalidMessage());
78110
}
79111

80-
public function testShouldDoNothingOnReject()
112+
public function testShouldDeleteMessageFromQueueOnReject()
81113
{
82114
$queue = new DbalDestination('queue');
83115

84116
$message = new DbalMessage();
85117
$message->setBody('theBody');
118+
$message->setDeliveryId('foo-delivery-id');
119+
120+
$dbal = $this->createConectionMock();
121+
$dbal
122+
->expects($this->once())
123+
->method('delete')
124+
->with(
125+
'some-table-name',
126+
['delivery_id' => $message->getDeliveryId()],
127+
['delivery_id' => Type::STRING]
128+
)
129+
;
86130

87131
$context = $this->createContextMock();
88132
$context
89-
->expects($this->never())
90-
->method('createProducer')
133+
->expects($this->once())
134+
->method('getDbalConnection')
135+
->will($this->returnValue($dbal))
136+
;
137+
$context
138+
->expects($this->once())
139+
->method('getTableName')
140+
->will($this->returnValue('some-table-name'))
91141
;
92142

93143
$consumer = new DbalConsumer($context, $queue);
@@ -136,6 +186,14 @@ private function createContextMock()
136186
{
137187
return $this->createMock(DbalContext::class);
138188
}
189+
190+
/**
191+
* @return \PHPUnit_Framework_MockObject_MockObject|DbalContext
192+
*/
193+
private function createConectionMock()
194+
{
195+
return $this->createMock(Connection::class);
196+
}
139197
}
140198

141199
class InvalidMessage implements Message

0 commit comments

Comments
 (0)