1
1
<?php
2
2
3
+ declare (strict_types=1 );
4
+
3
5
namespace Enqueue \Client \Driver ;
4
6
5
7
use Enqueue \Client \Config ;
6
- use Enqueue \Client \DriverInterface ;
7
8
use Enqueue \Client \Message ;
8
- use Enqueue \Client \Meta \QueueMetaRegistry ;
9
+ use Enqueue \Client \MessagePriority ;
10
+ use Enqueue \Client \RouteCollection ;
9
11
use Interop \Amqp \AmqpContext ;
10
12
use Interop \Amqp \AmqpMessage ;
11
13
use Interop \Amqp \AmqpQueue ;
12
14
use Interop \Amqp \AmqpTopic ;
13
15
use Interop \Amqp \Impl \AmqpBind ;
14
16
use Interop \Queue \PsrMessage ;
15
17
use Interop \Queue \PsrQueue ;
18
+ use Interop \Queue \PsrTopic ;
16
19
use Psr \Log \LoggerInterface ;
17
20
use Psr \Log \NullLogger ;
18
21
19
- class AmqpDriver implements DriverInterface
22
+ class AmqpDriver extends GenericDriver
20
23
{
21
24
/**
22
25
* @var AmqpContext
@@ -29,43 +32,59 @@ class AmqpDriver implements DriverInterface
29
32
private $ config ;
30
33
31
34
/**
32
- * @var QueueMetaRegistry
35
+ * @var array
36
+ */
37
+ private $ priorityMap ;
38
+
39
+ /**
40
+ * @var RouteCollection
33
41
*/
34
- private $ queueMetaRegistry ;
42
+ private $ routeCollection ;
35
43
36
- public function __construct (AmqpContext $ context , Config $ config , QueueMetaRegistry $ queueMetaRegistry )
44
+ public function __construct (AmqpContext $ context , Config $ config , RouteCollection $ routeCollection )
37
45
{
38
46
$ this ->context = $ context ;
39
47
$ this ->config = $ config ;
40
- $ this ->queueMetaRegistry = $ queueMetaRegistry ;
41
- }
48
+ $ this ->routeCollection = $ routeCollection ;
42
49
43
- public function sendToRouter (Message $ message ): void
44
- {
45
- if (false == $ message ->getProperty (Config::PARAMETER_TOPIC_NAME )) {
46
- throw new \LogicException ('Topic name parameter is required but is not set ' );
47
- }
48
-
49
- $ topic = $ this ->createRouterTopic ();
50
- $ transportMessage = $ this ->createTransportMessage ($ message );
50
+ $ this ->priorityMap = [
51
+ MessagePriority::VERY_LOW => 0 ,
52
+ MessagePriority::LOW => 1 ,
53
+ MessagePriority::NORMAL => 2 ,
54
+ MessagePriority::HIGH => 3 ,
55
+ MessagePriority::VERY_HIGH => 4 ,
56
+ ];
51
57
52
- $ this -> context -> createProducer ()-> send ( $ topic , $ transportMessage );
58
+ parent :: __construct ( $ context, $ config , $ routeCollection );
53
59
}
54
60
55
- public function sendToProcessor (Message $ message ): void
61
+ /**
62
+ * @return AmqpMessage
63
+ */
64
+ public function createTransportMessage (Message $ clientMessage ): PsrMessage
56
65
{
57
- if (false == $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
58
- throw new \LogicException ('Processor name parameter is required but is not set ' );
59
- }
66
+ /** @var AmqpMessage $transportMessage */
67
+ $ transportMessage = parent ::createTransportMessage ($ clientMessage );
68
+ $ transportMessage ->setDeliveryMode (AmqpMessage::DELIVERY_MODE_PERSISTENT );
69
+ $ transportMessage ->setContentType ($ clientMessage ->getContentType ());
60
70
61
- if (false == $ queueName = $ message -> getProperty (Config:: PARAMETER_PROCESSOR_QUEUE_NAME )) {
62
- throw new \ LogicException ( ' Queue name parameter is required but is not set ' );
71
+ if ($ clientMessage -> getExpire ( )) {
72
+ $ transportMessage -> setExpiration ( $ clientMessage -> getExpire () * 1000 );
63
73
}
64
74
65
- $ transportMessage = $ this ->createTransportMessage ($ message );
66
- $ destination = $ this ->createQueue ($ queueName );
75
+ if ($ priority = $ clientMessage ->getPriority ()) {
76
+ if (false == array_key_exists ($ priority , $ this ->getPriorityMap ())) {
77
+ throw new \InvalidArgumentException (sprintf (
78
+ 'Cant convert client priority "%s" to transport one. Could be one of "%s" ' ,
79
+ $ priority ,
80
+ implode ('", " ' , array_keys ($ this ->getPriorityMap ()))
81
+ ));
82
+ }
67
83
68
- $ this ->context ->createProducer ()->send ($ destination , $ transportMessage );
84
+ $ transportMessage ->setPriority ($ this ->priorityMap [$ priority ]);
85
+ }
86
+
87
+ return $ transportMessage ;
69
88
}
70
89
71
90
public function setupBroker (LoggerInterface $ logger = null ): void
@@ -77,100 +96,73 @@ public function setupBroker(LoggerInterface $logger = null): void
77
96
78
97
// setup router
79
98
$ routerTopic = $ this ->createRouterTopic ();
80
- $ routerQueue = $ this ->createQueue ($ this ->config ->getRouterQueueName ());
81
-
82
99
$ log ('Declare router exchange: %s ' , $ routerTopic ->getTopicName ());
83
100
$ this ->context ->declareTopic ($ routerTopic );
101
+
102
+ $ routerQueue = $ this ->createQueue ($ this ->config ->getRouterQueueName ());
84
103
$ log ('Declare router queue: %s ' , $ routerQueue ->getQueueName ());
85
104
$ this ->context ->declareQueue ($ routerQueue );
105
+
86
106
$ log ('Bind router queue to exchange: %s -> %s ' , $ routerQueue ->getQueueName (), $ routerTopic ->getTopicName ());
87
107
$ this ->context ->bind (new AmqpBind ($ routerTopic , $ routerQueue , $ routerQueue ->getQueueName ()));
88
108
89
109
// setup queues
90
- foreach ($ this ->queueMetaRegistry ->getQueuesMeta () as $ meta ) {
91
- $ queue = $ this ->createQueue ($ meta ->getClientName ());
110
+ $ declaredQueues = [];
111
+ foreach ($ this ->routeCollection ->all () as $ route ) {
112
+ /** @var AmqpQueue $queue */
113
+ $ queue = $ this ->createRouteQueue ($ route );
114
+ if (array_key_exists ($ queue ->getQueueName (), $ declaredQueues )) {
115
+ continue ;
116
+ }
92
117
93
118
$ log ('Declare processor queue: %s ' , $ queue ->getQueueName ());
94
119
$ this ->context ->declareQueue ($ queue );
120
+
121
+ $ declaredQueues [$ queue ->getQueueName ()] = true ;
95
122
}
96
123
}
97
124
98
125
/**
99
126
* @return AmqpQueue
100
127
*/
101
- public function createQueue (string $ queueName ): PsrQueue
128
+ public function createQueue (string $ clientQueuName ): PsrQueue
102
129
{
103
- $ transportName = $ this ->queueMetaRegistry ->getQueueMeta ($ queueName )->getTransportName ();
104
-
105
- $ queue = $ this ->context ->createQueue ($ transportName );
130
+ /** @var AmqpQueue $queue */
131
+ $ queue = parent ::createQueue ($ clientQueuName );
106
132
$ queue ->addFlag (AmqpQueue::FLAG_DURABLE );
107
133
108
134
return $ queue ;
109
135
}
110
136
111
137
/**
112
- * @return AmqpMessage
138
+ * @param AmqpTopic $topic
139
+ * @param AmqpMessage $transportMessage
113
140
*/
114
- public function createTransportMessage ( Message $ message ): PsrMessage
141
+ protected function doSendToRouter ( PsrTopic $ topic , PsrMessage $ transportMessage ): void
115
142
{
116
- $ headers = $ message ->getHeaders ();
117
- $ properties = $ message ->getProperties ();
118
-
119
- $ transportMessage = $ this ->context ->createMessage ();
120
- $ transportMessage ->setBody ($ message ->getBody ());
121
- $ transportMessage ->setHeaders ($ headers );
122
- $ transportMessage ->setProperties ($ properties );
123
- $ transportMessage ->setMessageId ($ message ->getMessageId ());
124
- $ transportMessage ->setTimestamp ($ message ->getTimestamp ());
125
- $ transportMessage ->setReplyTo ($ message ->getReplyTo ());
126
- $ transportMessage ->setCorrelationId ($ message ->getCorrelationId ());
127
- $ transportMessage ->setContentType ($ message ->getContentType ());
128
- $ transportMessage ->setDeliveryMode (AmqpMessage::DELIVERY_MODE_PERSISTENT );
129
-
130
- if ($ message ->getExpire ()) {
131
- $ transportMessage ->setExpiration ($ message ->getExpire () * 1000 );
132
- }
143
+ // We should not handle priority, expiration, and delay at this stage.
144
+ // The router will take care of it while re-sending the message to the final destinations.
145
+ $ transportMessage ->setPriority (null );
146
+ $ transportMessage ->setExpiration (null );
133
147
134
- return $ transportMessage ;
148
+ $ this -> context -> createProducer ()-> send ( $ topic , $ transportMessage) ;
135
149
}
136
150
137
151
/**
138
- * @param AmqpMessage $message
152
+ * @return AmqpTopic
139
153
*/
140
- public function createClientMessage (PsrMessage $ message ): Message
141
- {
142
- $ clientMessage = new Message ();
143
-
144
- $ clientMessage ->setBody ($ message ->getBody ());
145
- $ clientMessage ->setHeaders ($ message ->getHeaders ());
146
- $ clientMessage ->setProperties ($ message ->getProperties ());
147
- $ clientMessage ->setContentType ($ message ->getContentType ());
148
-
149
- if ($ expiration = $ message ->getExpiration ()) {
150
- $ clientMessage ->setExpire ((int ) ($ expiration / 1000 ));
151
- }
152
-
153
- $ clientMessage ->setMessageId ($ message ->getMessageId ());
154
- $ clientMessage ->setTimestamp ($ message ->getTimestamp ());
155
- $ clientMessage ->setReplyTo ($ message ->getReplyTo ());
156
- $ clientMessage ->setCorrelationId ($ message ->getCorrelationId ());
157
-
158
- return $ clientMessage ;
159
- }
160
-
161
- public function getConfig (): Config
154
+ protected function createRouterTopic (): PsrTopic
162
155
{
163
- return $ this ->config ;
164
- }
165
-
166
- private function createRouterTopic (): AmqpTopic
167
- {
168
- $ topic = $ this ->context ->createTopic (
169
- $ this ->config ->createTransportRouterTopicName ($ this ->config ->getRouterTopicName ())
170
- );
156
+ /** @var AmqpTopic $topic */
157
+ $ topic = parent ::createRouterTopic ();
171
158
$ topic ->setType (AmqpTopic::TYPE_FANOUT );
172
159
$ topic ->addFlag (AmqpTopic::FLAG_DURABLE );
173
160
174
161
return $ topic ;
175
162
}
163
+
164
+ protected function getPriorityMap (): array
165
+ {
166
+ return $ this ->priorityMap ;
167
+ }
176
168
}
0 commit comments