Skip to content

Commit 50a1a56

Browse files
authored
Merge pull request #826 from alessandroniciforo/master
Add support for using the /topic prefix instead of /exchange.
2 parents e56b22c + 47f8bcf commit 50a1a56

6 files changed

+114
-5
lines changed

Diff for: docs/transport/stomp.md

+6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ $factory = new StompConnectionFactory('stomp:');
3636
// same as above
3737
$factory = new StompConnectionFactory([]);
3838

39+
// connect via stomp to RabbitMQ (default) - the topic names are prefixed with /exchange
40+
$factory = new StompConnectionFactory('stomp+rabbitmq:');
41+
42+
// connect via stomp to ActiveMQ - the topic names are prefixed with /topic
43+
$factory = new StompConnectionFactory('stomp+activemq:');
44+
3945
// connect to stomp broker at example.com port 1000 using
4046
$factory = new StompConnectionFactory([
4147
'host' => 'example.com',

Diff for: pkg/stomp/StompConnectionFactory.php

+17-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
class StompConnectionFactory implements ConnectionFactory
1313
{
14+
const SCHEME_EXT_ACTIVEMQ = 'activemq';
15+
const SCHEME_EXT_RABBITMQ = 'rabbitmq';
16+
1417
/**
1518
* @var array
1619
*/
@@ -66,13 +69,15 @@ public function __construct($config = 'stomp:')
6669
*/
6770
public function createContext(): Context
6871
{
72+
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;
73+
6974
if ($this->config['lazy']) {
7075
return new StompContext(function () {
7176
return $this->establishConnection();
72-
});
77+
}, $useExchangePrefix);
7378
}
7479

75-
return new StompContext($this->establishConnection());
80+
return new StompContext($this->establishConnection(), $useExchangePrefix);
7681
}
7782

7883
private function establishConnection(): BufferedStompClient
@@ -103,7 +108,16 @@ private function parseDsn(string $dsn): array
103108
throw new \LogicException(sprintf('The given DSN is not supported. Must start with "stomp:".'));
104109
}
105110

111+
$schemeExtension = current($dsn->getSchemeExtensions());
112+
if (false === $schemeExtension) {
113+
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
114+
}
115+
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
116+
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
117+
}
118+
106119
return array_filter(array_replace($dsn->getQuery(), [
120+
'target' => $schemeExtension,
107121
'host' => $dsn->getHost(),
108122
'port' => $dsn->getPort(),
109123
'login' => $dsn->getUser(),
@@ -120,6 +134,7 @@ private function parseDsn(string $dsn): array
120134
private function defaultConfig(): array
121135
{
122136
return [
137+
'target' => self::SCHEME_EXT_RABBITMQ,
123138
'host' => 'localhost',
124139
'port' => 61613,
125140
'login' => 'guest',

Diff for: pkg/stomp/StompContext.php

+10-2
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ class StompContext implements Context
2323
*/
2424
private $stomp;
2525

26+
/**
27+
* @var bool
28+
*/
29+
private $useExchangePrefix;
30+
2631
/**
2732
* @var callable
2833
*/
2934
private $stompFactory;
3035

3136
/**
3237
* @param BufferedStompClient|callable $stomp
38+
* @param bool $useExchangePrefix
3339
*/
34-
public function __construct($stomp)
40+
public function __construct($stomp, $useExchangePrefix = true)
3541
{
3642
if ($stomp instanceof BufferedStompClient) {
3743
$this->stomp = $stomp;
@@ -40,6 +46,8 @@ public function __construct($stomp)
4046
} else {
4147
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
4248
}
49+
50+
$this->useExchangePrefix = $useExchangePrefix;
4351
}
4452

4553
/**
@@ -84,7 +92,7 @@ public function createTopic(string $name): Topic
8492
{
8593
if (0 !== strpos($name, '/')) {
8694
$destination = new StompDestination();
87-
$destination->setType(StompDestination::TYPE_EXCHANGE);
95+
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
8896
$destination->setStompName($name);
8997

9098
return $destination;

Diff for: pkg/stomp/Tests/StompConnectionFactoryConfigTest.php

+43
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public static function provideConfigs()
5555
yield [
5656
null,
5757
[
58+
'target' => 'rabbitmq',
5859
'host' => 'localhost',
5960
'port' => 61613,
6061
'login' => 'guest',
@@ -71,6 +72,7 @@ public static function provideConfigs()
7172
yield [
7273
'stomp:',
7374
[
75+
'target' => 'rabbitmq',
7476
'host' => 'localhost',
7577
'port' => 61613,
7678
'login' => 'guest',
@@ -87,6 +89,7 @@ public static function provideConfigs()
8789
yield [
8890
[],
8991
[
92+
'target' => 'rabbitmq',
9093
'host' => 'localhost',
9194
'port' => 61613,
9295
'login' => 'guest',
@@ -103,6 +106,43 @@ public static function provideConfigs()
103106
yield [
104107
'stomp://localhost:1234?foo=bar&lazy=0&sync=true',
105108
[
109+
'target' => 'rabbitmq',
110+
'host' => 'localhost',
111+
'port' => 1234,
112+
'login' => 'guest',
113+
'password' => 'guest',
114+
'vhost' => '/',
115+
'buffer_size' => 1000,
116+
'connection_timeout' => 1,
117+
'sync' => true,
118+
'lazy' => false,
119+
'foo' => 'bar',
120+
'ssl_on' => false,
121+
],
122+
];
123+
124+
yield [
125+
'stomp+activemq://localhost:1234?foo=bar&lazy=0&sync=true',
126+
[
127+
'target' => 'activemq',
128+
'host' => 'localhost',
129+
'port' => 1234,
130+
'login' => 'guest',
131+
'password' => 'guest',
132+
'vhost' => '/',
133+
'buffer_size' => 1000,
134+
'connection_timeout' => 1,
135+
'sync' => true,
136+
'lazy' => false,
137+
'foo' => 'bar',
138+
'ssl_on' => false,
139+
],
140+
];
141+
142+
yield [
143+
'stomp+rabbitmq://localhost:1234?foo=bar&lazy=0&sync=true',
144+
[
145+
'target' => 'rabbitmq',
106146
'host' => 'localhost',
107147
'port' => 1234,
108148
'login' => 'guest',
@@ -120,6 +160,7 @@ public static function provideConfigs()
120160
yield [
121161
['dsn' => 'stomp://localhost:1234/theVhost?foo=bar&lazy=0&sync=true', 'baz' => 'bazVal', 'foo' => 'fooVal'],
122162
[
163+
'target' => 'rabbitmq',
123164
'host' => 'localhost',
124165
'port' => 1234,
125166
'login' => 'guest',
@@ -138,6 +179,7 @@ public static function provideConfigs()
138179
yield [
139180
['dsn' => 'stomp:///%2f'],
140181
[
182+
'target' => 'rabbitmq',
141183
'host' => 'localhost',
142184
'port' => 61613,
143185
'login' => 'guest',
@@ -154,6 +196,7 @@ public static function provideConfigs()
154196
yield [
155197
['host' => 'localhost', 'port' => 1234, 'foo' => 'bar'],
156198
[
199+
'target' => 'rabbitmq',
157200
'host' => 'localhost',
158201
'port' => 1234,
159202
'login' => 'guest',

Diff for: pkg/stomp/Tests/StompConnectionFactoryTest.php

+25
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,31 @@ public function testShouldCreateLazyContext()
2525
$this->assertInstanceOf(StompContext::class, $context);
2626

2727
$this->assertAttributeEquals(null, 'stomp', $context);
28+
$this->assertAttributeEquals(true, 'useExchangePrefix', $context);
2829
$this->assertInternalType('callable', $this->readAttribute($context, 'stompFactory'));
2930
}
31+
32+
public function testShouldCreateRabbitMQContext()
33+
{
34+
$factory = new StompConnectionFactory('stomp+rabbitmq://');
35+
36+
$context = $factory->createContext();
37+
38+
$this->assertInstanceOf(StompContext::class, $context);
39+
40+
$this->assertAttributeEquals(null, 'stomp', $context);
41+
$this->assertAttributeEquals(true, 'useExchangePrefix', $context);
42+
}
43+
44+
public function testShouldCreateActiveMQContext()
45+
{
46+
$factory = new StompConnectionFactory('stomp+activemq://');
47+
48+
$context = $factory->createContext();
49+
50+
$this->assertInstanceOf(StompContext::class, $context);
51+
52+
$this->assertAttributeEquals(null, 'stomp', $context);
53+
$this->assertAttributeEquals(false, 'useExchangePrefix', $context);
54+
}
3055
}

Diff for: pkg/stomp/Tests/StompContextTest.php

+13-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public function testCreateQueueShouldCreateDestinationIfNameIsFullDestinationStr
7979
$this->assertEquals('/amq/queue/name/routing-key', $destination->getQueueName());
8080
}
8181

82-
public function testShouldCreateTopicInstance()
82+
public function testShouldCreateTopicInstanceWithExchangePrefix()
8383
{
8484
$context = new StompContext($this->createStompClientMock());
8585

@@ -91,6 +91,18 @@ public function testShouldCreateTopicInstance()
9191
$this->assertSame(StompDestination::TYPE_EXCHANGE, $topic->getType());
9292
}
9393

94+
public function testShouldCreateTopicInstanceWithTopicPrefix()
95+
{
96+
$context = new StompContext($this->createStompClientMock(), false);
97+
98+
$topic = $context->createTopic('the name');
99+
100+
$this->assertInstanceOf(StompDestination::class, $topic);
101+
$this->assertSame('/topic/the name', $topic->getQueueName());
102+
$this->assertSame('/topic/the name', $topic->getTopicName());
103+
$this->assertSame(StompDestination::TYPE_TOPIC, $topic->getType());
104+
}
105+
94106
public function testCreateTopicShouldCreateDestinationIfNameIsFullDestinationString()
95107
{
96108
$context = new StompContext($this->createStompClientMock());

0 commit comments

Comments
 (0)