Skip to content

Commit 4a7b8f2

Browse files
committed
monitoring
1 parent 299db53 commit 4a7b8f2

6 files changed

+188
-87
lines changed

MessageStats.php renamed to ConsumedMessageStats.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Enqueue\Monitoring;
66

7-
class MessageStats implements Stats
7+
class ConsumedMessageStats implements Stats
88
{
99
const STATUS_ACK = 'acknowledged';
1010
const STATUS_REJECTED = 'rejected';

InfluxDbStorage.php

+90-68
Original file line numberDiff line numberDiff line change
@@ -2,144 +2,156 @@
22

33
namespace Enqueue\Monitoring;
44

5+
use Enqueue\Client\Config;
6+
use Enqueue\Dsn\Dsn;
57
use InfluxDB\Client;
68
use InfluxDB\Database;
79
use InfluxDB\Point;
810

911
class InfluxDbStorage implements StatsStorage
1012
{
1113
/**
12-
* @var Client
14+
* @var array
1315
*/
14-
private $client;
16+
private $config;
1517

1618
/**
17-
* @var string
18-
*/
19-
private $dbName;
20-
21-
/**
22-
* @var string
23-
*/
24-
private $measurementMessages;
25-
26-
/**
27-
* @var string
19+
* @var Client
2820
*/
29-
private $measurementConsumers;
21+
private $client;
3022

3123
/**
3224
* @var Database
3325
*/
3426
private $database;
3527

3628
/**
37-
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost.
29+
* The config could be an array, string DSN, null or instance of InfluxDB\Client. In case of null it will attempt to connect to localhost.
3830
*
3931
* $config = [
40-
* 'dsn' => 'wamp://127.0.0.1:9090',
41-
* 'host' => '127.0.0.1',
42-
* 'port' => '9090',
43-
* 'topic' => 'stats',
44-
* 'max_retries' => 15,
45-
* 'initial_retry_delay' => 1.5,
46-
* 'max_retry_delay' => 300,
47-
* 'retry_delay_growth' => 1.5,
32+
* 'dsn' => 'influxdb://127.0.0.1:8086',
33+
* 'host' => '127.0.0.1',
34+
* 'port' => '8086',
35+
* 'user' => '',
36+
* 'password' => '',
37+
* 'db' => 'enqueue',
38+
* 'measurementSentMessages' => 'sent-messages',
39+
* 'measurementConsumedMessages' => 'consumed-messages',
40+
* 'measurementConsumers' => 'consumers',
4841
* ]
4942
*
5043
* or
5144
*
52-
* wamp://127.0.0.1:9090?max_retries=10
45+
* influxdb://127.0.0.1:8086?user=Jon&password=secret
5346
*
5447
* @param array|string|null $config
5548
*/
5649
public function __construct($config = 'influxdb:')
5750
{
58-
$this->client = $client;
59-
$this->dbName = $dbName;
60-
$this->measurementMessages = 'messages';
61-
$this->measurementConsumers = 'consumers';
62-
6351
if (false == class_exists(Client::class)) {
6452
throw new \LogicException('Seems client library is not installed. Please install "influxdb/influxdb-php"');
6553
}
6654

6755
if (empty($config)) {
68-
$config = $this->parseDsn('influxdb:');
56+
$config = [];
6957
} elseif (is_string($config)) {
7058
$config = $this->parseDsn($config);
7159
} elseif (is_array($config)) {
7260
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
61+
} elseif ($config instanceof Client) {
62+
$this->client = $config;
63+
$config = [];
7364
} else {
7465
throw new \LogicException('The config must be either an array of options, a DSN string or null');
7566
}
7667

7768
$config = array_replace([
7869
'host' => '127.0.0.1',
79-
'port' => '9090',
80-
'topic' => 'stats',
81-
'max_retries' => 15,
82-
'initial_retry_delay' => 1.5,
83-
'max_retry_delay' => 300,
84-
'retry_delay_growth' => 1.5,
70+
'port' => '8086',
71+
'user' => '',
72+
'password' => '',
73+
'db' => 'enqueue',
74+
'measurementSentMessages' => 'sent-messages',
75+
'measurementConsumedMessages' => 'consumed-messages',
76+
'measurementConsumers' => 'consumers',
8577
], $config);
8678

8779
$this->config = $config;
8880
}
8981

90-
public function pushConsumerStats(ConsumerStats $event): void
82+
public function pushConsumerStats(ConsumerStats $stats): void
9183
{
9284
$points = [];
9385

94-
foreach ($event->getQueues() as $queue) {
86+
foreach ($stats->getQueues() as $queue) {
9587
$tags = [
9688
'queue' => $queue,
97-
'consumerId' => $event->getConsumerId(),
89+
'consumerId' => $stats->getConsumerId(),
9890
];
9991

10092
$values = [
101-
'startedAtMs' => $event->getStartedAtMs(),
102-
'started' => $event->isStarted(),
103-
'finished' => $event->isFinished(),
104-
'failed' => $event->isFailed(),
105-
'received' => $event->getReceived(),
106-
'acknowledged' => $event->getAcknowledged(),
107-
'rejected' => $event->getRejected(),
108-
'requeued' => $event->getRequeued(),
109-
'memoryUsage' => $event->getMemoryUsage(),
110-
'systemLoad' => $event->getSystemLoad(),
93+
'startedAtMs' => $stats->getStartedAtMs(),
94+
'started' => $stats->isStarted(),
95+
'finished' => $stats->isFinished(),
96+
'failed' => $stats->isFailed(),
97+
'received' => $stats->getReceived(),
98+
'acknowledged' => $stats->getAcknowledged(),
99+
'rejected' => $stats->getRejected(),
100+
'requeued' => $stats->getRequeued(),
101+
'memoryUsage' => $stats->getMemoryUsage(),
102+
'systemLoad' => $stats->getSystemLoad(),
111103
];
112104

113-
if ($event->getFinishedAtMs()) {
114-
$values['finishedAtMs'] = $event->getFinishedAtMs();
105+
if ($stats->getFinishedAtMs()) {
106+
$values['finishedAtMs'] = $stats->getFinishedAtMs();
115107
}
116108

117-
$points[] = new Point($this->measurementConsumers, null, $tags, $values, $event->getTimestampMs());
109+
$points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs());
118110
}
119111

120112
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
121113
}
122114

123-
public function pushMessageStats(MessageStats $event): void
115+
public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
124116
{
125117
$tags = [
126-
'queue' => $event->getQueue(),
127-
'status' => $event->getStatus(),
118+
'queue' => $stats->getQueue(),
119+
'status' => $stats->getStatus(),
128120
];
129121

130122
$values = [
131-
'receivedAt' => $event->getReceivedAtMs(),
132-
'processedAt' => $event->getTimestampMs(),
123+
'receivedAt' => $stats->getReceivedAtMs(),
124+
'processedAt' => $stats->getTimestampMs(),
133125
];
134126

135-
if (MessageStats::STATUS_FAILED === $event->getStatus()) {
127+
if (ConsumedMessageStats::STATUS_FAILED === $stats->getStatus()) {
136128
$values['failed'] = 1;
137129
}
138130

139-
$runtime = $event->getTimestampMs() - $event->getReceivedAtMs();
131+
$runtime = $stats->getTimestampMs() - $stats->getReceivedAtMs();
140132

141133
$points = [
142-
new Point($this->measurementMessages, $runtime, $tags, $values, $event->getTimestampMs()),
134+
new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()),
135+
];
136+
137+
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
138+
}
139+
140+
public function pushSentMessageStats(SentMessageStats $stats): void
141+
{
142+
$tags = [];
143+
$properties = $stats->getProperties();
144+
145+
if (false === empty($properties[Config::TOPIC])) {
146+
$tags['topic'] = $properties[Config::TOPIC];
147+
}
148+
149+
if (false === empty($properties[Config::COMMAND])) {
150+
$tags['command'] = $properties[Config::COMMAND];
151+
}
152+
153+
$points = [
154+
new Point($this->config['measurementSentMessages'], null, $tags, [], $stats->getTimestampMs()),
143155
];
144156

145157
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
@@ -148,7 +160,16 @@ public function pushMessageStats(MessageStats $event): void
148160
private function getDb(): Database
149161
{
150162
if (null === $this->database) {
151-
$this->database = $this->client->selectDB($this->dbName);
163+
if (null === $this->client) {
164+
$this->client = new Client(
165+
$this->config['host'],
166+
$this->config['port'],
167+
$this->config['user'],
168+
$this->config['password']
169+
);
170+
}
171+
172+
$this->database = $this->client->selectDB($this->config['db']);
152173
$this->database->create();
153174
}
154175

@@ -159,21 +180,22 @@ private function parseDsn(string $dsn): array
159180
{
160181
$dsn = new Dsn($dsn);
161182

162-
if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) {
183+
if (false === in_array($dsn->getSchemeProtocol(), ['influxdb'], true)) {
163184
throw new \LogicException(sprintf(
164-
'The given scheme protocol "%s" is not supported. It must be "wamp"',
185+
'The given scheme protocol "%s" is not supported. It must be "influxdb"',
165186
$dsn->getSchemeProtocol()
166187
));
167188
}
168189

169190
return array_filter(array_replace($dsn->getQuery(), [
170191
'host' => $dsn->getHost(),
171192
'port' => $dsn->getPort(),
172-
'topic' => $dsn->getQueryParameter('topic'),
173-
'max_retries' => $dsn->getInt('max_retries'),
174-
'initial_retry_delay' => $dsn->getFloat('initial_retry_delay'),
175-
'max_retry_delay' => $dsn->getInt('max_retry_delay'),
176-
'retry_delay_growth' => $dsn->getFloat('retry_delay_growth'),
193+
'user' => $dsn->getUser(),
194+
'password' => $dsn->getPassword(),
195+
'db' => $dsn->getQueryParameter('db'),
196+
'measurementSentMessages' => $dsn->getQueryParameter('measurementSentMessages'),
197+
'measurementConsumedMessages' => $dsn->getQueryParameter('measurementConsumedMessages'),
198+
'measurementConsumers' => $dsn->getQueryParameter('measurementConsumers'),
177199
]), function ($value) { return null !== $value; });
178200
}
179201
}

0 commit comments

Comments
 (0)