Skip to content

Commit c5cb914

Browse files
authoredSep 8, 2017
Merge pull request #192 from php-enqueue/fs-polling-interval
[FS] Polling Interval
2 parents 8364a98 + 337bb00 commit c5cb914

9 files changed

+106
-10
lines changed
 

‎bin/test

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ function waitForService()
2323
waitForService rabbitmq 5672 50
2424
waitForService mysql 3306 50
2525
waitForService redis 6379 50
26-
waitForService beanstalkd 11300
27-
waitForService gearmand 4730
28-
waitForService kafka 9092
26+
waitForService beanstalkd 11300 50
27+
waitForService gearmand 4730 50
28+
waitForService kafka 9092 50
2929

3030
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
3131
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

‎pkg/fs/FsConnectionFactory.php

+8-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class FsConnectionFactory implements PsrConnectionFactory
1818
* 'path' => 'the directory where all queue\topic files remain. For example /home/foo/enqueue',
1919
* 'pre_fetch_count' => 'Integer. Defines how many messages to fetch from the file.',
2020
* 'chmod' => 'Defines a mode the files are created with',
21+
* 'polling_interval' => 'How often query for new messages, default 100 (milliseconds)',
2122
* ]
2223
*
2324
* or
@@ -48,7 +49,12 @@ public function __construct($config = 'file://')
4849
*/
4950
public function createContext()
5051
{
51-
return new FsContext($this->config['path'], $this->config['pre_fetch_count'], $this->config['chmod']);
52+
return new FsContext(
53+
$this->config['path'],
54+
$this->config['pre_fetch_count'],
55+
$this->config['chmod'],
56+
$this->config['polling_interval']
57+
);
5258
}
5359

5460
/**
@@ -99,6 +105,7 @@ private function defaultConfig()
99105
'path' => null,
100106
'pre_fetch_count' => 1,
101107
'chmod' => 0600,
108+
'polling_interval' => 100,
102109
];
103110
}
104111
}

‎pkg/fs/FsConsumer.php

+41-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ class FsConsumer implements PsrConsumer
2828
*/
2929
private $preFetchedMessages;
3030

31+
/**
32+
* @var int microseconds
33+
*/
34+
private $pollingInterval = 100000;
35+
3136
/**
3237
* @param FsContext $context
3338
* @param FsDestination $destination
@@ -42,6 +47,26 @@ public function __construct(FsContext $context, FsDestination $destination, $pre
4247
$this->preFetchedMessages = [];
4348
}
4449

50+
/**
51+
* Set polling interval in milliseconds.
52+
*
53+
* @param int $msec
54+
*/
55+
public function setPollingInterval($msec)
56+
{
57+
$this->pollingInterval = $msec * 1000;
58+
}
59+
60+
/**
61+
* Get polling interval in milliseconds.
62+
*
63+
* @return int
64+
*/
65+
public function getPollingInterval()
66+
{
67+
return (int) $this->pollingInterval / 1000;
68+
}
69+
4570
/**
4671
* {@inheritdoc}
4772
*
@@ -59,13 +84,25 @@ public function getQueue()
5984
*/
6085
public function receive($timeout = 0)
6186
{
62-
$end = microtime(true) + ($timeout / 1000);
63-
while (0 === $timeout || microtime(true) < $end) {
64-
if ($message = $this->receiveNoWait()) {
87+
$timeout /= 1000;
88+
$startAt = microtime(true);
89+
90+
while (true) {
91+
$message = $this->receiveNoWait();
92+
93+
if ($message) {
6594
return $message;
6695
}
6796

68-
usleep(100);
97+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
98+
return;
99+
}
100+
101+
usleep($this->pollingInterval);
102+
103+
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
104+
return;
105+
}
69106
}
70107
}
71108

‎pkg/fs/FsContext.php

+15-2
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,26 @@ class FsContext implements PsrContext
3232
*/
3333
private $lockHandlers;
3434

35+
/**
36+
* @var null
37+
*/
38+
private $pollingInterval;
39+
3540
/**
3641
* @param string $storeDir
3742
* @param int $preFetchCount
3843
* @param int $chmod
44+
* @param null $pollingInterval
3945
*/
40-
public function __construct($storeDir, $preFetchCount, $chmod)
46+
public function __construct($storeDir, $preFetchCount, $chmod, $pollingInterval = null)
4147
{
4248
$fs = new Filesystem();
4349
$fs->mkdir($storeDir);
4450

4551
$this->storeDir = $storeDir;
4652
$this->preFetchCount = $preFetchCount;
4753
$this->chmod = $chmod;
54+
$this->pollingInterval = $pollingInterval;
4855

4956
$this->lockHandlers = [];
5057
}
@@ -160,7 +167,13 @@ public function createConsumer(PsrDestination $destination)
160167
{
161168
InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class);
162169

163-
return new FsConsumer($this, $destination, $this->preFetchCount);
170+
$consumer = new FsConsumer($this, $destination, $this->preFetchCount);
171+
172+
if ($this->pollingInterval) {
173+
$consumer->setPollingInterval($this->pollingInterval);
174+
}
175+
176+
return $consumer;
164177
}
165178

166179
public function close()

‎pkg/fs/Symfony/FsTransportFactory.php

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
5656
->defaultValue(0600)
5757
->info('The queue files are created with this given permissions if not exist.')
5858
->end()
59+
->integerNode('polling_interval')
60+
->defaultValue(100)
61+
->min(50)
62+
->info('How often query for new messages.')
63+
->end()
5964
;
6065
}
6166

‎pkg/fs/Tests/FsConnectionFactoryConfigTest.php

+7
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public static function provideConfigs()
5858
'path' => sys_get_temp_dir().'/enqueue',
5959
'pre_fetch_count' => 1,
6060
'chmod' => 0600,
61+
'polling_interval' => 100,
6162
],
6263
];
6364

@@ -67,6 +68,7 @@ public static function provideConfigs()
6768
'path' => sys_get_temp_dir().'/enqueue',
6869
'pre_fetch_count' => 1,
6970
'chmod' => 0600,
71+
'polling_interval' => 100,
7072
],
7173
];
7274

@@ -76,6 +78,7 @@ public static function provideConfigs()
7678
'path' => sys_get_temp_dir().'/enqueue',
7779
'pre_fetch_count' => 1,
7880
'chmod' => 0600,
81+
'polling_interval' => 100,
7982
],
8083
];
8184

@@ -85,6 +88,7 @@ public static function provideConfigs()
8588
'path' => sys_get_temp_dir().'/enqueue',
8689
'pre_fetch_count' => 1,
8790
'chmod' => 0600,
91+
'polling_interval' => 100,
8892
],
8993
];
9094

@@ -94,6 +98,7 @@ public static function provideConfigs()
9498
'path' => '/foo/bar/baz',
9599
'pre_fetch_count' => 1,
96100
'chmod' => 0600,
101+
'polling_interval' => 100,
97102
],
98103
];
99104

@@ -103,6 +108,7 @@ public static function provideConfigs()
103108
'path' => '/foo/bar/baz',
104109
'pre_fetch_count' => 1,
105110
'chmod' => 0600,
111+
'polling_interval' => 100,
106112
],
107113
];
108114

@@ -112,6 +118,7 @@ public static function provideConfigs()
112118
'path' => '/foo/bar/baz',
113119
'pre_fetch_count' => 100,
114120
'chmod' => 0666,
121+
'polling_interval' => 100,
115122
],
116123
];
117124
}

‎pkg/fs/Tests/FsConsumerTest.php

+8
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public function testShouldDoNothingOnReject()
6464
$consumer->reject(new FsMessage());
6565
}
6666

67+
public function testCouldSetAndGetPollingInterval()
68+
{
69+
$consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
70+
$consumer->setPollingInterval(123456);
71+
72+
$this->assertEquals(123456, $consumer->getPollingInterval());
73+
}
74+
6775
public function testShouldSendSameMessageToDestinationOnReQueue()
6876
{
6977
$message = new FsMessage();

‎pkg/fs/Tests/FsContextTest.php

+14
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,18 @@ public function testShouldCreateFileOnFilesystemIfNotExistOnDeclareDestination()
233233

234234
unlink($tmpFile);
235235
}
236+
237+
public function testShouldCreateMessageConsumerAndSetPollingInterval()
238+
{
239+
$tmpFile = new TempFile(sys_get_temp_dir().'/foo');
240+
241+
$context = new FsContext(sys_get_temp_dir(), 1, 0666, 123456);
242+
243+
$queue = $context->createQueue($tmpFile->getFilename());
244+
245+
$consumer = $context->createConsumer($queue);
246+
247+
$this->assertInstanceOf(FsConsumer::class, $consumer);
248+
$this->assertEquals(123456, $consumer->getPollingInterval());
249+
}
236250
}

‎pkg/fs/Tests/Symfony/FsTransportFactoryTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public function testShouldAllowAddConfiguration()
5252
'path' => sys_get_temp_dir(),
5353
'pre_fetch_count' => 1,
5454
'chmod' => 0600,
55+
'polling_interval' => 100,
5556
], $config);
5657
}
5758

@@ -69,6 +70,7 @@ public function testShouldAllowAddConfigurationAsString()
6970
'dsn' => 'fileDSN',
7071
'pre_fetch_count' => 1,
7172
'chmod' => 0600,
73+
'polling_interval' => 100,
7274
], $config);
7375
}
7476

@@ -82,6 +84,7 @@ public function testShouldCreateConnectionFactory()
8284
'path' => sys_get_temp_dir(),
8385
'pre_fetch_count' => 1,
8486
'chmod' => 0600,
87+
'polling_interval' => 100,
8588
]);
8689

8790
$this->assertTrue($container->hasDefinition($serviceId));
@@ -91,6 +94,7 @@ public function testShouldCreateConnectionFactory()
9194
'path' => sys_get_temp_dir(),
9295
'pre_fetch_count' => 1,
9396
'chmod' => 0600,
97+
'polling_interval' => 100,
9498
]], $factory->getArguments());
9599
}
96100

@@ -120,6 +124,7 @@ public function testShouldCreateContext()
120124
'path' => sys_get_temp_dir(),
121125
'pre_fetch_count' => 1,
122126
'chmod' => 0600,
127+
'polling_interval' => 100,
123128
]);
124129

125130
$this->assertEquals('enqueue.transport.fs.context', $serviceId);

0 commit comments

Comments
 (0)
Please sign in to comment.