Skip to content

Commit 10f507d

Browse files
committedOct 30, 2018
Moved common logic to trait
1 parent 6532d1e commit 10f507d

File tree

4 files changed

+123
-101
lines changed

4 files changed

+123
-101
lines changed
 

‎pkg/dbal/DbalConsumer.php

+26-55
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
class DbalConsumer implements Consumer
1717
{
18-
use ConsumerPollingTrait;
18+
use ConsumerPollingTrait,
19+
DbalConsumerHelperTrait;
1920

2021
/**
2122
* @var DbalContext
@@ -48,6 +49,16 @@ public function __construct(DbalContext $context, DbalDestination $queue)
4849
$this->redeliveryDelay = 1200000;
4950
}
5051

52+
public function getContext(): DbalContext
53+
{
54+
return $this->context;
55+
}
56+
57+
public function getConnection(): Connection
58+
{
59+
return $this->dbal;
60+
}
61+
5162
/**
5263
* Get interval between retry failed messages in milliseconds.
5364
*/
@@ -78,47 +89,27 @@ public function receiveNoWait(): ?Message
7889
{
7990
$this->redeliverMessages();
8091

81-
$this->dbal->beginTransaction();
92+
$this->getConnection()->beginTransaction();
8293
try {
83-
$now = (int) time();
84-
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
8594
$deliveryId = (string) Uuid::uuid1();
8695

8796
// get top message from the queue
88-
$message = $this->fetchMessage($now);
97+
$message = $this->fetchMessage([$this->queue->getQueueName()]);
8998

9099
if (null == $message) {
91-
$this->dbal->commit();
100+
$this->getConnection()->commit();
92101

93102
return null;
94103
}
95104

96-
// mark message as delivered to consumer
97-
$this->dbal->createQueryBuilder()
98-
->update($this->context->getTableName())
99-
->set('delivery_id', ':deliveryId')
100-
->set('redeliver_after', ':redeliverAfter')
101-
->andWhere('id = :id')
102-
->setParameter('id', $message['id'], Type::GUID)
103-
->setParameter('deliveryId', $deliveryId, Type::STRING)
104-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
105-
->execute()
106-
;
107-
108-
$dbalMessage = $this->dbal->createQueryBuilder()
109-
->select('*')
110-
->from($this->context->getTableName())
111-
->andWhere('delivery_id = :deliveryId')
112-
->setParameter('deliveryId', $deliveryId, Type::STRING)
113-
->setMaxResults(1)
114-
->execute()
115-
->fetch()
116-
;
117-
118-
$this->dbal->commit();
105+
$this->markMessageAsDeliveredToConsumer($message, $deliveryId);
106+
107+
$dbalMessage = $this->getMessageByDeliveryId($deliveryId);
108+
109+
$this->getConnection()->commit();
119110

120111
if ($dbalMessage['redelivered'] || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) {
121-
return $this->context->convertMessage($dbalMessage);
112+
return $this->getContext()->convertMessage($dbalMessage);
122113
}
123114

124115
return null;
@@ -145,7 +136,7 @@ public function reject(Message $message, bool $requeue = false): void
145136
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
146137

147138
if ($requeue) {
148-
$this->context->createProducer()->send($this->queue, $message);
139+
$this->getContext()->createProducer()->send($this->queue, $message);
149140

150141
return;
151142
}
@@ -155,37 +146,17 @@ public function reject(Message $message, bool $requeue = false): void
155146

156147
private function deleteMessage(?string $deliveryId): void
157148
{
158-
$this->dbal->delete(
159-
$this->context->getTableName(),
149+
$this->getConnection()->delete(
150+
$this->getContext()->getTableName(),
160151
['delivery_id' => $deliveryId],
161152
['delivery_id' => Type::STRING]
162153
);
163154
}
164155

165-
private function fetchMessage(int $now): ?array
166-
{
167-
$result = $this->dbal->createQueryBuilder()
168-
->select('*')
169-
->from($this->context->getTableName())
170-
->andWhere('delivery_id IS NULL')
171-
->andWhere('queue = :queue')
172-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
173-
->addOrderBy('priority', 'desc')
174-
->addOrderBy('published_at', 'asc')
175-
->setParameter('queue', $this->queue->getQueueName(), Type::STRING)
176-
->setParameter('delayedUntil', $now, Type::BIGINT)
177-
->setMaxResults(1)
178-
->execute()
179-
->fetch()
180-
;
181-
182-
return $result ?: null;
183-
}
184-
185156
private function redeliverMessages(): void
186157
{
187-
$this->dbal->createQueryBuilder()
188-
->update($this->context->getTableName())
158+
$this->getConnection()->createQueryBuilder()
159+
->update($this->getContext()->getTableName())
189160
->set('delivery_id', ':deliveryId')
190161
->set('redelivered', ':redelivered')
191162
->andWhere('delivery_id IS NOT NULL')

‎pkg/dbal/DbalConsumerHelperTrait.php

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Dbal;
6+
7+
use Doctrine\DBAL\Connection;
8+
use Doctrine\DBAL\Types\Type;
9+
10+
trait DbalConsumerHelperTrait
11+
{
12+
abstract public function getContext(): DbalContext;
13+
14+
abstract public function getConnection(): Connection;
15+
16+
protected function fetchMessage(array $queues): ?array
17+
{
18+
$now = time();
19+
20+
$result = $this->getConnection()->createQueryBuilder()
21+
->select('*')
22+
->from($this->getContext()->getTableName())
23+
->andWhere('delivery_id IS NULL')
24+
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
25+
->andWhere('queue IN (:queues)')
26+
->addOrderBy('priority', 'desc')
27+
->addOrderBy('published_at', 'asc')
28+
->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER)
29+
->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY)
30+
->setMaxResults(1)
31+
->execute()
32+
->fetch()
33+
;
34+
35+
return $result ?: null;
36+
}
37+
38+
protected function markMessageAsDeliveredToConsumer(array $message, string $deliveryId): void
39+
{
40+
$now = time();
41+
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
42+
43+
$this->getConnection()->createQueryBuilder()
44+
->andWhere('id = :id')
45+
->update($this->getContext()->getTableName())
46+
->set('delivery_id', ':deliveryId')
47+
->set('redeliver_after', ':redeliverAfter')
48+
->setParameter('id', $message['id'], Type::GUID)
49+
->setParameter('deliveryId', $deliveryId, Type::STRING)
50+
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
51+
->execute()
52+
;
53+
}
54+
55+
protected function getMessageByDeliveryId(string $deliveryId): array
56+
{
57+
return $this->getConnection()->createQueryBuilder()
58+
->select('*')
59+
->from($this->getContext()->getTableName())
60+
->andWhere('delivery_id = :deliveryId')
61+
->setParameter('deliveryId', $deliveryId, Type::STRING)
62+
->setMaxResults(1)
63+
->execute()
64+
->fetch()
65+
;
66+
}
67+
}

‎pkg/dbal/DbalContext.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,13 @@ public function close(): void
129129

130130
public function createSubscriptionConsumer(): SubscriptionConsumer
131131
{
132-
return new DbalSubscriptionConsumer($this);
132+
$consumer = new DbalSubscriptionConsumer($this);
133+
134+
if (isset($this->config['redelivery_delay'])) {
135+
$consumer->setRedeliveryDelay($this->config['redelivery_delay']);
136+
}
137+
138+
return $consumer;
133139
}
134140

135141
/**

‎pkg/dbal/DbalSubscriptionConsumer.php

+23-45
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
namespace Enqueue\Dbal;
66

7-
use Doctrine\DBAL\Types\Type;
7+
use Doctrine\DBAL\Connection;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\SubscriptionConsumer;
1010
use Ramsey\Uuid\Uuid;
1111

1212
class DbalSubscriptionConsumer implements SubscriptionConsumer
1313
{
14+
use DbalConsumerHelperTrait;
15+
1416
/**
1517
* @var DbalContext
1618
*/
@@ -47,6 +49,16 @@ public function __construct(DbalContext $context)
4749
$this->redeliveryDelay = 1200000;
4850
}
4951

52+
public function getContext(): DbalContext
53+
{
54+
return $this->context;
55+
}
56+
57+
public function getConnection(): Connection
58+
{
59+
return $this->dbal;
60+
}
61+
5062
/**
5163
* Get interval between retry failed messages in milliseconds.
5264
*/
@@ -55,6 +67,11 @@ public function getRedeliveryDelay(): int
5567
return $this->redeliveryDelay;
5668
}
5769

70+
public function setRedeliveryDelay(int $redeliveryDelay): void
71+
{
72+
$this->redeliveryDelay = $redeliveryDelay;
73+
}
74+
5875
public function consume(int $timeout = 0): void
5976
{
6077
if (empty($this->subscribers)) {
@@ -63,7 +80,6 @@ public function consume(int $timeout = 0): void
6380

6481
$now = time();
6582
$timeout /= 1000;
66-
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
6783
$deliveryId = (string) Uuid::uuid1();
6884

6985
$queueNames = [];
@@ -80,29 +96,11 @@ public function consume(int $timeout = 0): void
8096
$message = $this->fetchMessage($currentQueueNames);
8197

8298
if ($message) {
83-
// mark message as delivered to consumer
84-
$this->dbal->createQueryBuilder()
85-
->update($this->context->getTableName())
86-
->set('delivery_id', ':deliveryId')
87-
->set('redeliver_after', ':redeliverAfter')
88-
->andWhere('id = :id')
89-
->setParameter('id', $message['id'], Type::GUID)
90-
->setParameter('deliveryId', $deliveryId, Type::STRING)
91-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
92-
->execute()
93-
;
94-
95-
$message = $this->dbal->createQueryBuilder()
96-
->select('*')
97-
->from($this->context->getTableName())
98-
->andWhere('delivery_id = :deliveryId')
99-
->setParameter('deliveryId', $deliveryId, Type::STRING)
100-
->setMaxResults(1)
101-
->execute()
102-
->fetch()
103-
;
104-
105-
$dbalMessage = $this->context->convertMessage($message);
99+
$this->markMessageAsDeliveredToConsumer($message, $deliveryId);
100+
101+
$message = $this->getMessageByDeliveryId($deliveryId);
102+
103+
$dbalMessage = $this->getContext()->convertMessage($message);
106104

107105
/**
108106
* @var DbalConsumer
@@ -174,24 +172,4 @@ public function unsubscribeAll(): void
174172
{
175173
$this->subscribers = [];
176174
}
177-
178-
private function fetchMessage(array $queues): ?array
179-
{
180-
$result = $this->dbal->createQueryBuilder()
181-
->select('*')
182-
->from($this->context->getTableName())
183-
->andWhere('delivery_id IS NULL')
184-
->andWhere('queue IN (:queues)')
185-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
186-
->addOrderBy('priority', 'desc')
187-
->addOrderBy('published_at', 'asc')
188-
->setParameter('queues', array_keys($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY)
189-
->setParameter('delayedUntil', time(), \Doctrine\DBAL\ParameterType::INTEGER)
190-
->setMaxResults(1)
191-
->execute()
192-
->fetch()
193-
;
194-
195-
return $result ?: null;
196-
}
197175
}

0 commit comments

Comments
 (0)
Please sign in to comment.