Skip to content

Commit c5199ca

Browse files
committed
Migrate fs transport.
1 parent d30a3ac commit c5199ca

11 files changed

+124
-281
lines changed

Diff for: FsConnectionFactory.php

+4-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Fs;
44

55
use Interop\Queue\PsrConnectionFactory;
6+
use Interop\Queue\PsrContext;
67

78
class FsConnectionFactory implements PsrConnectionFactory
89
{
@@ -44,11 +45,9 @@ public function __construct($config = 'file:')
4445
}
4546

4647
/**
47-
* {@inheritdoc}
48-
*
4948
* @return FsContext
5049
*/
51-
public function createContext()
50+
public function createContext(): PsrContext
5251
{
5352
return new FsContext(
5453
$this->config['path'],
@@ -58,12 +57,7 @@ public function createContext()
5857
);
5958
}
6059

61-
/**
62-
* @param string $dsn
63-
*
64-
* @return array
65-
*/
66-
private function parseDsn($dsn)
60+
private function parseDsn(string $dsn): array
6761
{
6862
if ($dsn && '/' === $dsn[0]) {
6963
return ['path' => $dsn];
@@ -100,7 +94,7 @@ private function parseDsn($dsn)
10094
return $config;
10195
}
10296

103-
private function defaultConfig()
97+
private function defaultConfig(): array
10498
{
10599
return [
106100
'path' => null,

Diff for: FsConsumer.php

+26-49
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Interop\Queue\InvalidMessageException;
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
89

910
class FsConsumer implements PsrConsumer
1011
{
@@ -29,16 +30,13 @@ class FsConsumer implements PsrConsumer
2930
private $preFetchedMessages;
3031

3132
/**
32-
* @var int microseconds
33+
* In milliseconds.
34+
*
35+
* @var int
3336
*/
34-
private $pollingInterval = 100000;
37+
private $pollingInterval = 100;
3538

36-
/**
37-
* @param FsContext $context
38-
* @param FsDestination $destination
39-
* @param int $preFetchCount
40-
*/
41-
public function __construct(FsContext $context, FsDestination $destination, $preFetchCount)
39+
public function __construct(FsContext $context, FsDestination $destination, int $preFetchCount)
4240
{
4341
$this->context = $context;
4442
$this->destination = $destination;
@@ -49,40 +47,32 @@ public function __construct(FsContext $context, FsDestination $destination, $pre
4947

5048
/**
5149
* Set polling interval in milliseconds.
52-
*
53-
* @param int $msec
5450
*/
55-
public function setPollingInterval($msec)
51+
public function setPollingInterval(int $msec): void
5652
{
57-
$this->pollingInterval = $msec * 1000;
53+
$this->pollingInterval = $msec;
5854
}
5955

6056
/**
6157
* Get polling interval in milliseconds.
62-
*
63-
* @return int
6458
*/
65-
public function getPollingInterval()
59+
public function getPollingInterval(): int
6660
{
67-
return (int) $this->pollingInterval / 1000;
61+
return $this->pollingInterval;
6862
}
6963

7064
/**
71-
* {@inheritdoc}
72-
*
7365
* @return FsDestination
7466
*/
75-
public function getQueue()
67+
public function getQueue(): PsrQueue
7668
{
7769
return $this->destination;
7870
}
7971

8072
/**
81-
* {@inheritdoc}
82-
*
83-
* @return FsMessage|null
73+
* @return FsMessage
8474
*/
85-
public function receive($timeout = 0)
75+
public function receive(int $timeout = 0): ?PsrMessage
8676
{
8777
$timeout /= 1000;
8878
$startAt = microtime(true);
@@ -95,21 +85,21 @@ public function receive($timeout = 0)
9585
}
9686

9787
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
98-
return;
88+
return null;
9989
}
10090

10191
usleep($this->pollingInterval);
10292

10393
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
104-
return;
94+
return null;
10595
}
10696
}
10797
}
10898

10999
/**
110-
* {@inheritdoc}
100+
* @return FsMessage
111101
*/
112-
public function receiveNoWait()
102+
public function receiveNoWait(): ?PsrMessage
113103
{
114104
if ($this->preFetchedMessages) {
115105
return array_shift($this->preFetchedMessages);
@@ -140,15 +130,15 @@ public function receiveNoWait()
140130
$expireAt = $fetchedMessage->getHeader('x-expire-at');
141131
if ($expireAt && $expireAt - microtime(true) < 0) {
142132
// message has expired, just drop it.
143-
return;
133+
return null;
144134
}
145135

146136
$this->preFetchedMessages[] = $fetchedMessage;
147137
} catch (\Exception $e) {
148138
throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e);
149139
}
150140
} else {
151-
return;
141+
return null;
152142
}
153143

154144
--$count;
@@ -158,20 +148,16 @@ public function receiveNoWait()
158148
if ($this->preFetchedMessages) {
159149
return array_shift($this->preFetchedMessages);
160150
}
151+
152+
return null;
161153
}
162154

163-
/**
164-
* {@inheritdoc}
165-
*/
166-
public function acknowledge(PsrMessage $message)
155+
public function acknowledge(PsrMessage $message): void
167156
{
168157
// do nothing. fs transport always works in auto ack mode
169158
}
170159

171-
/**
172-
* {@inheritdoc}
173-
*/
174-
public function reject(PsrMessage $message, $requeue = false)
160+
public function reject(PsrMessage $message, bool $requeue = false): void
175161
{
176162
InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class);
177163

@@ -182,29 +168,20 @@ public function reject(PsrMessage $message, $requeue = false)
182168
}
183169
}
184170

185-
/**
186-
* @return int
187-
*/
188-
public function getPreFetchCount()
171+
public function getPreFetchCount(): int
189172
{
190173
return $this->preFetchCount;
191174
}
192175

193-
/**
194-
* @param int $preFetchCount
195-
*/
196-
public function setPreFetchCount($preFetchCount)
176+
public function setPreFetchCount(int $preFetchCount): void
197177
{
198178
$this->preFetchCount = $preFetchCount;
199179
}
200180

201181
/**
202182
* @param resource $file
203-
* @param int $frameNumber
204-
*
205-
* @return string
206183
*/
207-
private function readFrame($file, $frameNumber)
184+
private function readFrame($file, int $frameNumber): string
208185
{
209186
$frameSize = 64;
210187
$offset = $frameNumber * $frameSize;

0 commit comments

Comments
 (0)