@@ -21,28 +21,70 @@ class InfluxDbStorage implements StatsStorage
21
21
/**
22
22
* @var string
23
23
*/
24
- private $ measurMessages ;
24
+ private $ measurementMessages ;
25
25
26
26
/**
27
27
* @var string
28
28
*/
29
- private $ measurConsumers ;
29
+ private $ measurementConsumers ;
30
30
31
31
/**
32
32
* @var Database
33
33
*/
34
34
private $ database ;
35
35
36
36
/**
37
- * @param Client $client
38
- * @param string $dbName
37
+ * The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost.
38
+ *
39
+ * $config = [
40
+ * 'dsn' => 'wamp://127.0.0.1:9090',
41
+ * 'host' => '127.0.0.1',
42
+ * 'port' => '9090',
43
+ * 'topic' => 'stats',
44
+ * 'max_retries' => 15,
45
+ * 'initial_retry_delay' => 1.5,
46
+ * 'max_retry_delay' => 300,
47
+ * 'retry_delay_growth' => 1.5,
48
+ * ]
49
+ *
50
+ * or
51
+ *
52
+ * wamp://127.0.0.1:9090?max_retries=10
53
+ *
54
+ * @param array|string|null $config
39
55
*/
40
- public function __construct (Client $ client , string $ dbName )
56
+ public function __construct ($ config = ' influxdb: ' )
41
57
{
42
58
$ this ->client = $ client ;
43
59
$ this ->dbName = $ dbName ;
44
- $ this ->measurMessages = 'messages ' ;
45
- $ this ->measurConsumers = 'consumers ' ;
60
+ $ this ->measurementMessages = 'messages ' ;
61
+ $ this ->measurementConsumers = 'consumers ' ;
62
+
63
+ if (false == class_exists (Client::class)) {
64
+ throw new \LogicException ('Seems client library is not installed. Please install "influxdb/influxdb-php" ' );
65
+ }
66
+
67
+ if (empty ($ config )) {
68
+ $ config = $ this ->parseDsn ('influxdb: ' );
69
+ } elseif (is_string ($ config )) {
70
+ $ config = $ this ->parseDsn ($ config );
71
+ } elseif (is_array ($ config )) {
72
+ $ config = empty ($ config ['dsn ' ]) ? $ config : $ this ->parseDsn ($ config ['dsn ' ]);
73
+ } else {
74
+ throw new \LogicException ('The config must be either an array of options, a DSN string or null ' );
75
+ }
76
+
77
+ $ config = array_replace ([
78
+ 'host ' => '127.0.0.1 ' ,
79
+ 'port ' => '9090 ' ,
80
+ 'topic ' => 'stats ' ,
81
+ 'max_retries ' => 15 ,
82
+ 'initial_retry_delay ' => 1.5 ,
83
+ 'max_retry_delay ' => 300 ,
84
+ 'retry_delay_growth ' => 1.5 ,
85
+ ], $ config );
86
+
87
+ $ this ->config = $ config ;
46
88
}
47
89
48
90
public function pushConsumerStats (ConsumerStats $ event ): void
@@ -72,7 +114,7 @@ public function pushConsumerStats(ConsumerStats $event): void
72
114
$ values ['finishedAtMs ' ] = $ event ->getFinishedAtMs ();
73
115
}
74
116
75
- $ points [] = new Point ($ this ->measurConsumers , null , $ tags , $ values , $ event ->getTimestampMs ());
117
+ $ points [] = new Point ($ this ->measurementConsumers , null , $ tags , $ values , $ event ->getTimestampMs ());
76
118
}
77
119
78
120
$ this ->getDb ()->writePoints ($ points , Database::PRECISION_MILLISECONDS );
@@ -97,7 +139,7 @@ public function pushMessageStats(MessageStats $event): void
97
139
$ runtime = $ event ->getTimestampMs () - $ event ->getReceivedAtMs ();
98
140
99
141
$ points = [
100
- new Point ($ this ->measurMessages , $ runtime , $ tags , $ values , $ event ->getTimestampMs ()),
142
+ new Point ($ this ->measurementMessages , $ runtime , $ tags , $ values , $ event ->getTimestampMs ()),
101
143
];
102
144
103
145
$ this ->getDb ()->writePoints ($ points , Database::PRECISION_MILLISECONDS );
@@ -112,4 +154,26 @@ private function getDb(): Database
112
154
113
155
return $ this ->database ;
114
156
}
157
+
158
+ private function parseDsn (string $ dsn ): array
159
+ {
160
+ $ dsn = new Dsn ($ dsn );
161
+
162
+ if (false === in_array ($ dsn ->getSchemeProtocol (), ['wamp ' , 'ws ' ], true )) {
163
+ throw new \LogicException (sprintf (
164
+ 'The given scheme protocol "%s" is not supported. It must be "wamp" ' ,
165
+ $ dsn ->getSchemeProtocol ()
166
+ ));
167
+ }
168
+
169
+ return array_filter (array_replace ($ dsn ->getQuery (), [
170
+ 'host ' => $ dsn ->getHost (),
171
+ 'port ' => $ dsn ->getPort (),
172
+ 'topic ' => $ dsn ->getQueryParameter ('topic ' ),
173
+ 'max_retries ' => $ dsn ->getInt ('max_retries ' ),
174
+ 'initial_retry_delay ' => $ dsn ->getFloat ('initial_retry_delay ' ),
175
+ 'max_retry_delay ' => $ dsn ->getInt ('max_retry_delay ' ),
176
+ 'retry_delay_growth ' => $ dsn ->getFloat ('retry_delay_growth ' ),
177
+ ]), function ($ value ) { return null !== $ value ; });
178
+ }
115
179
}
0 commit comments