diff --git a/pkg/stomp/StompConnectionFactory.php b/pkg/stomp/StompConnectionFactory.php index 764145ca2..818283493 100644 --- a/pkg/stomp/StompConnectionFactory.php +++ b/pkg/stomp/StompConnectionFactory.php @@ -8,6 +8,8 @@ use Interop\Queue\ConnectionFactory; use Interop\Queue\Context; use Stomp\Network\Connection; +use Stomp\Network\Observer\HeartbeatEmitter; +use Stomp\Network\Observer\ServerAliveObserver; class StompConnectionFactory implements ConnectionFactory { @@ -88,11 +90,22 @@ private function establishConnection(): BufferedStompClient $scheme = (true === $config['ssl_on']) ? 'ssl' : 'tcp'; $uri = $scheme.'://'.$config['host'].':'.$config['port']; $connection = new Connection($uri, $config['connection_timeout']); + $connection->setWriteTimeout($config['write_timeout']); + $connection->setReadTimeout($config['read_timeout']); + + if ($config['send_heartbeat']) { + $connection->getObservers()->addObserver(new HeartbeatEmitter($connection)); + } + + if ($config['receive_heartbeat']) { + $connection->getObservers()->addObserver(new ServerAliveObserver()); + } $this->stomp = new BufferedStompClient($connection, $config['buffer_size']); $this->stomp->setLogin($config['login'], $config['password']); $this->stomp->setVhostname($config['vhost']); $this->stomp->setSync($config['sync']); + $this->stomp->setHeartbeat($config['send_heartbeat'], $config['receive_heartbeat']); $this->stomp->connect(); } @@ -128,6 +141,10 @@ private function parseDsn(string $dsn): array 'sync' => $dsn->getBool('sync'), 'lazy' => $dsn->getBool('lazy'), 'ssl_on' => $dsn->getBool('ssl_on'), + 'write_timeout' => $dsn->getDecimal('write_timeout'), + 'read_timeout' => $dsn->getDecimal('read_timeout'), + 'send_heartbeat' => $dsn->getDecimal('send_heartbeat'), + 'receive_heartbeat' => $dsn->getDecimal('receive_heartbeat'), ]), function ($value) { return null !== $value; }); } @@ -145,6 +162,10 @@ private function defaultConfig(): array 'sync' => false, 'lazy' => true, 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ]; } } diff --git a/pkg/stomp/Tests/Functional/StompConnectionFactoryTest.php b/pkg/stomp/Tests/Functional/StompConnectionFactoryTest.php new file mode 100644 index 000000000..9029c17c1 --- /dev/null +++ b/pkg/stomp/Tests/Functional/StompConnectionFactoryTest.php @@ -0,0 +1,49 @@ +getDsn().'?send_heartbeat=2000'; + $factory = new StompConnectionFactory($dsn); + $this->expectException(HeartbeatException::class); + $factory->createContext()->getStomp(); + } + + public function testShouldCreateConnectionWithSendHeartbeat() + { + $dsn = $this->getDsn().'?send_heartbeat=2000&read_timeout=1'; + $factory = new StompConnectionFactory($dsn); + $context = $factory->createContext(); + + $observers = $context->getStomp()->getConnection()->getObservers()->getObservers(); + $this->assertAttributeEquals([2000, 0], 'heartbeat', $context->getStomp()); + $this->assertCount(1, $observers); + $this->assertInstanceOf(HeartbeatEmitter::class, $observers[0]); + } + + public function testShouldCreateConnectionWithReceiveHeartbeat() + { + $dsn = $this->getDsn().'?receive_heartbeat=2000'; + $factory = new StompConnectionFactory($dsn); + $context = $factory->createContext(); + + $observers = $context->getStomp()->getConnection()->getObservers()->getObservers(); + $this->assertAttributeEquals([0, 2000], 'heartbeat', $context->getStomp()); + $this->assertCount(1, $observers); + $this->assertInstanceOf(ServerAliveObserver::class, $observers[0]); + } +} diff --git a/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php b/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php index af8e71bd7..3d2ae99d9 100644 --- a/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php +++ b/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php @@ -66,6 +66,10 @@ public static function provideConfigs() 'sync' => false, 'lazy' => true, 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -83,6 +87,10 @@ public static function provideConfigs() 'sync' => false, 'lazy' => true, 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -100,6 +108,10 @@ public static function provideConfigs() 'sync' => false, 'lazy' => true, 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -118,6 +130,10 @@ public static function provideConfigs() 'lazy' => false, 'foo' => 'bar', 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -136,6 +152,10 @@ public static function provideConfigs() 'lazy' => false, 'foo' => 'bar', 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -154,6 +174,10 @@ public static function provideConfigs() 'lazy' => false, 'foo' => 'bar', 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -173,6 +197,10 @@ public static function provideConfigs() 'foo' => 'bar', 'ssl_on' => false, 'baz' => 'bazVal', + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -190,6 +218,10 @@ public static function provideConfigs() 'sync' => false, 'lazy' => true, 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; @@ -208,6 +240,10 @@ public static function provideConfigs() 'lazy' => true, 'foo' => 'bar', 'ssl_on' => false, + 'write_timeout' => 3, + 'read_timeout' => 60, + 'send_heartbeat' => 0, + 'receive_heartbeat' => 0, ], ]; } diff --git a/pkg/test/RabbitmqStompExtension.php b/pkg/test/RabbitmqStompExtension.php index d381d1fcf..240f67edb 100644 --- a/pkg/test/RabbitmqStompExtension.php +++ b/pkg/test/RabbitmqStompExtension.php @@ -8,9 +8,14 @@ trait RabbitmqStompExtension { + private function getDsn() + { + return getenv('RABITMQ_STOMP_DSN'); + } + private function buildStompContext(): StompContext { - if (false == $dsn = getenv('RABITMQ_STOMP_DSN')) { + if (false == $dsn = $this->getDsn()) { throw new SkippedTestError('Functional tests are not allowed in this environment'); }