Skip to content

Commit b8c9678

Browse files
authoredJun 26, 2017
Merge pull request #124 from php-enqueue/merge-producers-interfaces
[client] Merge experimental ProducerV2 methods to Producer interface.
2 parents 131ff9e + 95cdb28 commit b8c9678

40 files changed

+987
-1047
lines changed
 

‎docs/bundle/debuging.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ class DefaultController extends Controller
3535
/** @var ProducerInterface $producer */
3636
$producer = $this->get('enqueue.producer');
3737

38-
$producer->send('foo_topic', 'Hello world');
38+
$producer->sendEvent('foo_topic', 'Hello world');
3939

40-
$producer->send('bar_topic', ['bar' => 'val']);
40+
$producer->sendEvent('bar_topic', ['bar' => 'val']);
4141

4242
$message = new Message();
4343
$message->setBody('baz');
44-
$producer->send('baz_topic', $message);
44+
$producer->sendEvent('baz_topic', $message);
4545

4646
// ...
4747
}

‎docs/bundle/job_queue.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class Step1Processor implements PsrProcessor
8989
$runner->createDelayed(
9090
$jobName,
9191
function (JobRunner $runner, Job $childJob) use ($entity) {
92-
$this->producer->send('search:index:index-single-entity', [
92+
$this->producer->sendEvent('search:index:index-single-entity', [
9393
'entityId' => $entity->getId(),
9494
'jobId' => $childJob->getId(),
9595
]);

‎docs/bundle/message_producer.md

+33-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,13 @@ $context->createProducer()->send(
2424
The client is shipped with two types of producers. The first one sends messages immediately
2525
where another one (it is called spool producer) collects them in memory and sends them `onTerminate` event (the response is already sent).
2626

27+
The producer has two types on send methods:
2728

29+
* `sendEvent` - Message is sent to topic and many consumers can subscriber to it. It is "fire and forget" strategy. The even could be sent to "message bus" to other applications.
30+
* `sendCommand` - Message is to ONE exact consumer. It could be used as "fire and forget" or as RPC. The command message is always sent in scope of current application.
2831

32+
### Send event
33+
2934
```php
3035
<?php
3136

@@ -35,14 +40,40 @@ where another one (it is called spool producer) collects them in memory and send
3540
$producer = $container->get('enqueue.producer');
3641

3742
// message is being sent right now
38-
$producer->send('a_topic', 'Hello there!');
43+
$producer->sendEvent('a_topic', 'Hello there!');
44+
45+
46+
/** @var \Enqueue\Client\SpoolProducer $spoolProducer */
47+
$spoolProducer = $container->get('enqueue.spool_producer');
48+
49+
// message is being sent on console.terminate or kernel.terminate event
50+
$spoolProducer->sendEvent('a_topic', 'Hello there!');
51+
52+
// you could send queued messages manually by calling flush method
53+
$spoolProducer->flush();
54+
```
55+
56+
### Send command
57+
58+
```php
59+
<?php
60+
61+
/** @var Symfony\Component\DependencyInjection\ContainerInterface $container */
62+
63+
/** @var \Enqueue\Client\ProducerInterface $producer */
64+
$producer = $container->get('enqueue.producer');
65+
66+
// message is being sent right now, we use it as RPC
67+
$promise = $producer->sendCommand('a_processor_name', 'Hello there!', $needReply = true);
68+
69+
$replyMessage = $promise->receive();
3970

4071

4172
/** @var \Enqueue\Client\SpoolProducer $spoolProducer */
4273
$spoolProducer = $container->get('enqueue.spool_producer');
4374

4475
// message is being sent on console.terminate or kernel.terminate event
45-
$spoolProducer->send('a_topic', 'Hello there!');
76+
$spoolProducer->sendCommand('a_processor_name', 'Hello there!');
4677

4778
// you could send queued messages manually by calling flush method
4879
$spoolProducer->flush();

‎docs/bundle/quick_tour.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ use Enqueue\Client\Producer;
6060
/** @var Producer $producer **/
6161
$producer = $container->get('enqueue.producer');
6262

63-
$producer->send('aFooTopic', 'Something has happened');
63+
64+
// send event to many consumers
65+
$producer->sendEvent('aFooTopic', 'Something has happened');
66+
67+
// send command to ONE consumer
68+
$producer->sendCommand('aProcessorName', 'Something has happened');
6469
```
6570

6671
To consume messages you have to first create a message processor:

‎docs/client/message_examples.md

+28-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,28 @@
11
# Client. Message examples
2+
3+
* [Scope](#scope)
4+
* [Delay](#delay)
5+
* [Expiration (TTL)](#expiration-ttl)
6+
* [Priority](#priority)
7+
* [Timestamp, Content type, Message id](#timestamp-content-type-message-id)
8+
9+
## Scope
10+
11+
There are two two types possible scopes: `Message:SCOPE_MESSAGE_BUS` and `Message::SCOPE_APP`.
12+
The first one instructs the client send messages (if driver supports) to the message bus so other apps can consume those messages.
13+
The second in turns limits the message to the application that sent it. No other apps could receive it.
14+
15+
```php
16+
<?php
17+
18+
use Enqueue\Client\Message;
19+
20+
$message = new Message();
21+
$message->setScope(Message::SCOPE_MESSAGE_BUS);
22+
23+
/** @var \Enqueue\Client\ProducerInterface $producer */
24+
$producer->sendEvent('aTopic', $message);
25+
```
226

327
## Delay
428

@@ -15,7 +39,7 @@ $message = new Message();
1539
$message->setDelay(60); // seconds
1640

1741
/** @var \Enqueue\Client\ProducerInterface $producer */
18-
$producer->send('aTopic', $message);
42+
$producer->sendEvent('aTopic', $message);
1943
```
2044

2145
## Expiration (TTL)
@@ -33,7 +57,7 @@ $message = new Message();
3357
$message->setExpire(60); // seconds
3458

3559
/** @var \Enqueue\Client\ProducerInterface $producer */
36-
$producer->send('aTopic', $message);
60+
$producer->sendEvent('aTopic', $message);
3761
```
3862

3963
## Priority
@@ -52,7 +76,7 @@ $message = new Message();
5276
$message->setPriority(MessagePriority::HIGH);
5377

5478
/** @var \Enqueue\Client\ProducerInterface $producer */
55-
$producer->send('aTopic', $message);
79+
$producer->sendEvent('aTopic', $message);
5680
```
5781

5882
## Timestamp, Content type, Message id
@@ -72,7 +96,7 @@ $message->setTimestamp(time());
7296
$message->setContentType('text/plain');
7397

7498
/** @var \Enqueue\Client\ProducerInterface $producer */
75-
$producer->send('aTopic', $message);
99+
$producer->sendEvent('aTopic', $message);
76100
```
77101

78102
[back to index](../index.md)

‎docs/job_queue/run_sub_job.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RootJobProcessor implements PsrProcessor
2626
{
2727
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) {
2828
$runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) {
29-
$this->producer->send('aJobTopic', [
29+
$this->producer->sendEvent('aJobTopic', [
3030
'jobId' => $childJob->getId(),
3131
// other data required by sub job
3232
]);

‎docs/quick_tour.md

+47-5
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ It provides easy to use services for producing and processing messages.
163163
It supports unified format for setting message expiration, delay, timestamp, correlation id.
164164
It supports message bus so different applications can talk to each other.
165165

166-
Here's an example of how you can send and consume messages.
166+
Here's an example of how you can send and consume event messages.
167167

168168
```php
169169
<?php
@@ -179,15 +179,57 @@ $client = new SimpleClient('file://foo/bar');
179179
$client->setupBroker();
180180

181181
$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
182-
// your processing logic here
182+
// your event processor logic here
183183
});
184184

185-
$client->send('a_bar_topic', 'aMessageData');
186-
187-
// in another process you can consume messages.
185+
// this is a blocking call, it'll consume message until it is interrupted
188186
$client->consume();
189187
```
190188

189+
and command messages:
190+
191+
```php
192+
<?php
193+
use Enqueue\SimpleClient\SimpleClient;
194+
use Enqueue\Psr\PsrMessage;
195+
use Enqueue\Psr\PsrContext;
196+
use Enqueue\Client\Config;
197+
use Enqueue\Consumption\Extension\ReplyExtension;
198+
use Enqueue\Consumption\Result;
199+
200+
// composer require enqueue/amqp-ext
201+
$client = new SimpleClient('amqp://');
202+
203+
// composer require enqueue/fs
204+
$client = new SimpleClient('file://foo/bar');
205+
$client->
206+
207+
$client->setupBroker();
208+
209+
$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) {
210+
// your bar command processor logic here
211+
});
212+
213+
$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) {
214+
// your baz reply command processor logic here
215+
216+
return Result::reply($context->createMessage('theReplyBody'));
217+
});
218+
219+
// It is sent to one consumer.
220+
$client->sendCommand('bar_command', 'aMessageData');
221+
222+
// It is possible to get reply
223+
$promise = $client->sendCommand('bar_command', 'aMessageData', true);
224+
225+
// you can send several commands and only after start getting replies.
226+
227+
$replyMessage = $promise->receive(2000); // 2 sec
228+
229+
// this is a blocking call, it'll consume message until it is interrupted
230+
$client->consume([new ReplyExtension()]);
231+
```
232+
191233
## Cli commands
192234

193235
The library provides handy commands out of the box.

‎pkg/enqueue-bundle/Events/AsyncListener.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public function onEvent(Event $event = null, $eventName)
6060
$message->setProperty('event_name', $eventName);
6161
$message->setProperty('transformer_name', $transformerName);
6262

63-
$this->producer->send('event.'.$eventName, $message);
63+
$this->producer->sendEvent('event.'.$eventName, $message);
6464
}
6565
}
6666
}

‎pkg/enqueue-bundle/Resources/config/client.yml

+5-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ services:
77
class: 'Enqueue\Client\Producer'
88
arguments:
99
- '@enqueue.client.driver'
10+
- '@enqueue.client.rpc_factory'
1011
- '@enqueue.client.extensions'
1112

1213
enqueue.client.spool_producer:
@@ -24,18 +25,15 @@ services:
2425
alias: 'enqueue.client.producer'
2526

2627
enqueue.client.producer_v2:
27-
class: 'Enqueue\Client\ProducerV2'
28-
arguments:
29-
- '@enqueue.client.producer'
30-
- '@enqueue.client.rpc_client'
28+
alias: 'enqueue.client.producer'
3129

3230
enqueue.spool_producer:
3331
alias: 'enqueue.client.spool_producer'
3432

35-
enqueue.client.rpc_client:
36-
class: 'Enqueue\Client\RpcClient'
33+
enqueue.client.rpc_factory:
34+
class: 'Enqueue\Rpc\RpcFactory'
35+
public: false
3736
arguments:
38-
- '@enqueue.client.producer'
3937
- '@enqueue.transport.context'
4038

4139
enqueue.client.router_processor:

‎pkg/enqueue-bundle/Resources/config/services.yml

+7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@ services:
1818
tags:
1919
- { name: 'console.command' }
2020

21+
enqueue.transport.rpc_factory:
22+
class: 'Enqueue\Rpc\RpcFactory'
23+
public: false
24+
arguments:
25+
- '@enqueue.transport.context'
26+
2127
enqueue.transport.rpc_client:
2228
class: 'Enqueue\Rpc\RpcClient'
2329
arguments:
2430
- '@enqueue.transport.context'
31+
- '@enqueue.transport.rpc_factory'

0 commit comments

Comments
 (0)