Skip to content

Commit 205c877

Browse files
committed
receiveNoWait
1 parent e1f5231 commit 205c877

File tree

6 files changed

+698
-174
lines changed

6 files changed

+698
-174
lines changed

pkg/enqueue/Client/RpcClient.php

+59-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\Psr\PsrContext;
66
use Enqueue\Psr\PsrMessage;
77
use Enqueue\Rpc\Promise;
8+
use Enqueue\Rpc\TimeoutException;
89
use Enqueue\Util\UUID;
910

1011
class RpcClient
@@ -38,7 +39,7 @@ public function __construct(ProducerInterface $producer, PsrContext $context)
3839
*/
3940
public function call($topic, $message, $timeout)
4041
{
41-
return $this->callAsync($topic, $message, $timeout)->getMessage();
42+
return $this->callAsync($topic, $message, $timeout)->receive();
4243
}
4344

4445
/**
@@ -62,9 +63,11 @@ public function callAsync($topic, $message, $timeout)
6263

6364
if ($message->getReplyTo()) {
6465
$replyQueue = $this->context->createQueue($message->getReplyTo());
66+
$deleteReplyQueue = false;
6567
} else {
6668
$replyQueue = $this->context->createTemporaryQueue();
6769
$message->setReplyTo($replyQueue->getQueueName());
70+
$deleteReplyQueue = true;
6871
}
6972

7073
if (false == $message->getCorrelationId()) {
@@ -73,10 +76,60 @@ public function callAsync($topic, $message, $timeout)
7376

7477
$this->producer->send($topic, $message);
7578

76-
return new Promise(
77-
$this->context->createConsumer($replyQueue),
78-
$message->getCorrelationId(),
79-
$timeout
80-
);
79+
$correlationId = $message->getCorrelationId();
80+
81+
$receive = function() use ($replyQueue, $timeout, $correlationId) {
82+
83+
$endTime = time() + ((int) ($timeout / 1000));
84+
$consumer = $this->context->createConsumer($replyQueue);
85+
86+
do {
87+
if ($message = $consumer->receive($timeout)) {
88+
if ($message->getCorrelationId() === $correlationId) {
89+
$consumer->acknowledge($message);
90+
91+
return $message;
92+
}
93+
94+
$consumer->reject($message, true);
95+
}
96+
} while (time() < $endTime);
97+
98+
throw TimeoutException::create($timeout, $correlationId);
99+
};
100+
101+
$receiveNoWait = function() use ($replyQueue, $correlationId) {
102+
103+
static $consumer;
104+
105+
if (null === $consumer) {
106+
$consumer = $this->context->createConsumer($replyQueue);
107+
}
108+
109+
if ($message = $consumer->receiveNoWait()) {
110+
if ($message->getCorrelationId() === $correlationId) {
111+
$consumer->acknowledge($message);
112+
113+
return $message;
114+
}
115+
116+
$consumer->reject($message, true);
117+
}
118+
};
119+
120+
$finally = function(Promise $promise) use ($replyQueue) {
121+
if ($promise->isDeleteReplyQueue()) {
122+
if (false == method_exists($this->context, 'deleteQueue')) {
123+
throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context)));
124+
}
125+
126+
$this->context->deleteQueue($replyQueue);
127+
}
128+
};
129+
130+
$promise = new Promise($receive, $receiveNoWait, $finally);
131+
$promise->setDeleteReplyQueue($deleteReplyQueue);
132+
133+
return $promise;
81134
}
82135
}

pkg/enqueue/Rpc/Promise.php

+58-10
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ class Promise
1111
*/
1212
private $receiveCallback;
1313

14+
/**
15+
* @var \Closure
16+
*/
17+
private $receiveNoWaitCallback;
18+
1419
/**
1520
* @var \Closure
1621
*/
@@ -21,37 +26,80 @@ class Promise
2126
*/
2227
private $deleteReplyQueue;
2328

29+
/**
30+
* @var PsrMessage
31+
*/
32+
private $message;
33+
2434
/**
2535
* @param \Closure $receiveCallback
36+
* @param \Closure $receiveNoWaitCallback
2637
* @param \Closure $finallyCallback
2738
*/
28-
public function __construct(\Closure $receiveCallback, \Closure $finallyCallback)
39+
public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCallback, \Closure $finallyCallback)
2940
{
3041
$this->receiveCallback = $receiveCallback;
42+
$this->receiveNoWaitCallback = $receiveNoWaitCallback;
3143
$this->finallyCallback = $finallyCallback;
3244

3345
$this->deleteReplyQueue = true;
3446
}
3547

3648
/**
49+
* Blocks until message received or timeout expired.
50+
*
3751
* @throws TimeoutException if the wait timeout is reached
3852
*
3953
* @return PsrMessage
4054
*/
41-
public function getMessage()
55+
public function receive()
4256
{
43-
try {
44-
$result = call_user_func($this->receiveCallback, $this);
57+
if (null == $this->message) {
58+
try {
59+
if ($message = $this->doReceive($this->receiveCallback)) {
60+
$this->message = $message;
61+
}
62+
} finally {
63+
call_user_func($this->finallyCallback, $this);
64+
}
65+
}
4566

46-
if (false == $result instanceof PsrMessage) {
47-
throw new \LogicException(sprintf(
48-
'Expected "%s" but got: "%s"', PsrMessage::class, is_object($result) ? get_class($result) : gettype($result)));
67+
return $this->message;
68+
}
69+
70+
/**
71+
* Non blocking function. Returns message or null.
72+
*
73+
* @return PsrMessage|null
74+
*/
75+
public function receiveNoWait()
76+
{
77+
if (null == $this->message) {
78+
if ($message = $this->doReceive($this->receiveNoWaitCallback)) {
79+
$this->message = $message;
80+
81+
call_user_func($this->finallyCallback, $this);
4982
}
83+
}
5084

51-
return $result;
52-
} finally {
53-
call_user_func($this->finallyCallback, $this);
85+
return $this->message;
86+
}
87+
88+
/**
89+
* @param \Closure $cb
90+
*
91+
* @return PsrMessage
92+
*/
93+
private function doReceive(\Closure $cb)
94+
{
95+
$message = call_user_func($cb, $this);
96+
97+
if (null !== $message && false == $message instanceof PsrMessage) {
98+
throw new \RuntimeException(sprintf(
99+
'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message)));
54100
}
101+
102+
return $message;
55103
}
56104

57105
/**

pkg/enqueue/Rpc/RpcClient.php

+22-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function __construct(PsrContext $context)
3333
*/
3434
public function call(PsrDestination $destination, PsrMessage $message, $timeout)
3535
{
36-
return $this->callAsync($destination, $message, $timeout)->getMessage();
36+
return $this->callAsync($destination, $message, $timeout)->receive();
3737
}
3838

3939
/**
@@ -86,17 +86,36 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim
8686
throw TimeoutException::create($timeout, $correlationId);
8787
};
8888

89+
$receiveNoWait = function() use ($replyQueue, $correlationId) {
90+
91+
static $consumer;
92+
93+
if (null === $consumer) {
94+
$consumer = $this->context->createConsumer($replyQueue);
95+
}
96+
97+
if ($message = $consumer->receiveNoWait()) {
98+
if ($message->getCorrelationId() === $correlationId) {
99+
$consumer->acknowledge($message);
100+
101+
return $message;
102+
}
103+
104+
$consumer->reject($message, true);
105+
}
106+
};
107+
89108
$finally = function(Promise $promise) use ($replyQueue) {
90109
if ($promise->isDeleteReplyQueue()) {
91110
if (false == method_exists($this->context, 'deleteQueue')) {
92-
throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context)));
111+
throw new \RuntimeException(sprintf('Context does not support delete queue: "%s"', get_class($this->context)));
93112
}
94113

95114
$this->context->deleteQueue($replyQueue);
96115
}
97116
};
98117

99-
$promise = new Promise($receive, $finally);
118+
$promise = new Promise($receive, $receiveNoWait, $finally);
100119
$promise->setDeleteReplyQueue($deleteReplyQueue);
101120

102121
return $promise;

0 commit comments

Comments
 (0)