@@ -11,7 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
11
11
import type { Upgrader , Listener , ListenerEvents } from '@libp2p/interface-transport'
12
12
import type { Multiaddr } from '@multiformats/multiaddr'
13
13
import type { TCPCreateListenerOptions } from './index.js'
14
- import type { CounterGroup , Metric , Metrics } from '@libp2p/interface-metrics'
14
+ import type { CounterGroup , MetricGroup , Metrics } from '@libp2p/interface-metrics'
15
15
16
16
const log = logger ( 'libp2p:tcp:listener' )
17
17
@@ -39,7 +39,7 @@ const SERVER_STATUS_UP = 1
39
39
const SERVER_STATUS_DOWN = 0
40
40
41
41
export interface TCPListenerMetrics {
42
- status : Metric
42
+ status : MetricGroup
43
43
errors : CounterGroup
44
44
events : CounterGroup
45
45
}
@@ -52,12 +52,14 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
52
52
private readonly connections = new Set < MultiaddrConnection > ( )
53
53
private status : Status = { started : false }
54
54
private metrics ?: TCPListenerMetrics
55
+ private addr : string
55
56
56
57
constructor ( private readonly context : Context ) {
57
58
super ( )
58
59
59
60
context . keepAlive = context . keepAlive ?? true
60
61
62
+ this . addr = 'unknown'
61
63
this . server = net . createServer ( context , this . onSocket . bind ( this ) )
62
64
63
65
// https://nodejs.org/api/net.html#servermaxconnections
@@ -72,49 +74,56 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
72
74
if ( context . metrics != null ) {
73
75
// we are listening, register metrics for our port
74
76
const address = this . server . address ( )
75
- let addr : string
76
77
77
78
if ( address == null ) {
78
- addr = 'unknown'
79
+ this . addr = 'unknown'
79
80
} else if ( typeof address === 'string' ) {
80
81
// unix socket
81
- addr = address
82
+ this . addr = address
82
83
} else {
83
- addr = `${ address . address } :${ address . port } `
84
+ this . addr = `${ address . address } :${ address . port } `
84
85
}
85
86
86
- context . metrics ?. registerMetric ( `libp2p_tcp_connections_${ addr } _total` , {
87
+ context . metrics ?. registerMetricGroup ( 'libp2p_tcp_inbound_connections_total' , {
88
+ label : 'address' ,
87
89
help : 'Current active connections in TCP listener' ,
88
90
calculate : ( ) => {
89
- return this . connections . size
91
+ return {
92
+ [ this . addr ] : this . connections . size
93
+ }
90
94
}
91
95
} )
92
96
93
97
this . metrics = {
94
- status : context . metrics . registerMetric ( `libp2p_tcp_${ addr } _server_status_info` , {
95
- help : 'Current status of the TCP server'
98
+ status : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_status_info' , {
99
+ label : 'address' ,
100
+ help : 'Current status of the TCP listener socket'
96
101
} ) ,
97
- errors : context . metrics . registerCounterGroup ( `libp2p_tcp_ ${ addr } _server_errors_total` , {
98
- label : 'error ' ,
99
- help : 'Total count of TCP listener errors by error type'
102
+ errors : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_errors_total' , {
103
+ label : 'address ' ,
104
+ help : 'Total count of TCP listener errors by type'
100
105
} ) ,
101
- events : context . metrics . registerCounterGroup ( `libp2p_tcp_ ${ addr } _socket_events_total` , {
102
- label : 'event ' ,
103
- help : 'Total count of TCP socket events by event '
106
+ events : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_events_total' , {
107
+ label : 'address ' ,
108
+ help : 'Total count of TCP listener events by type '
104
109
} )
105
110
}
106
111
107
- this . metrics ?. status . update ( SERVER_STATUS_UP )
112
+ this . metrics ?. status . update ( {
113
+ [ this . addr ] : SERVER_STATUS_UP
114
+ } )
108
115
}
109
116
110
117
this . dispatchEvent ( new CustomEvent ( 'listening' ) )
111
118
} )
112
119
. on ( 'error' , err => {
113
- this . metrics ?. errors . increment ( { listen_error : true } )
120
+ this . metrics ?. errors . increment ( { [ ` ${ this . addr } listen_error` ] : true } )
114
121
this . dispatchEvent ( new CustomEvent < Error > ( 'error' , { detail : err } ) )
115
122
} )
116
123
. on ( 'close' , ( ) => {
117
- this . metrics ?. status . update ( SERVER_STATUS_DOWN )
124
+ this . metrics ?. status . update ( {
125
+ [ this . addr ] : SERVER_STATUS_DOWN
126
+ } )
118
127
this . dispatchEvent ( new CustomEvent ( 'close' ) )
119
128
} )
120
129
}
@@ -123,7 +132,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
123
132
// Avoid uncaught errors caused by unstable connections
124
133
socket . on ( 'error' , err => {
125
134
log ( 'socket error' , err )
126
- this . metrics ?. events . increment ( { error : true } )
135
+ this . metrics ?. events . increment ( { [ ` ${ this . addr } error` ] : true } )
127
136
} )
128
137
129
138
let maConn : MultiaddrConnection
@@ -132,19 +141,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
132
141
listeningAddr : this . status . started ? this . status . listeningAddr : undefined ,
133
142
socketInactivityTimeout : this . context . socketInactivityTimeout ,
134
143
socketCloseTimeout : this . context . socketCloseTimeout ,
135
- metrics : this . metrics ?. events
144
+ metrics : this . metrics ?. events ,
145
+ metricPrefix : `${ this . addr } `
136
146
} )
137
147
} catch ( err ) {
138
148
log . error ( 'inbound connection failed' , err )
139
- this . metrics ?. errors . increment ( { inbound_to_connection : true } )
149
+ this . metrics ?. errors . increment ( { [ ` ${ this . addr } inbound_to_connection` ] : true } )
140
150
return
141
151
}
142
152
143
153
log ( 'new inbound connection %s' , maConn . remoteAddr )
144
154
try {
145
155
this . context . upgrader . upgradeInbound ( maConn )
146
156
. then ( ( conn ) => {
147
- log ( 'inbound connection %s upgraded ' , maConn . remoteAddr )
157
+ log ( 'inbound connection upgraded %s ' , maConn . remoteAddr )
148
158
this . connections . add ( maConn )
149
159
150
160
socket . once ( 'close' , ( ) => {
@@ -159,7 +169,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
159
169
} )
160
170
. catch ( async err => {
161
171
log . error ( 'inbound connection failed' , err )
162
- this . metrics ?. errors . increment ( { inbound_upgrade : true } )
172
+ this . metrics ?. errors . increment ( { [ ` ${ this . addr } inbound_upgrade` ] : true } )
163
173
164
174
await attemptClose ( maConn )
165
175
} )
@@ -172,7 +182,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
172
182
attemptClose ( maConn )
173
183
. catch ( err => {
174
184
log . error ( 'closing inbound connection failed' , err )
175
- this . metrics ?. errors . increment ( { inbound_closing_failed : true } )
185
+ this . metrics ?. errors . increment ( { [ ` ${ this . addr } inbound_closing_failed` ] : true } )
176
186
} )
177
187
}
178
188
}
0 commit comments