@@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
11
11
import type { Upgrader , Listener , ListenerEvents } from '@libp2p/interface-transport'
12
12
import type { Multiaddr } from '@multiformats/multiaddr'
13
13
import type { TCPCreateListenerOptions } from './index.js'
14
+ import type { CounterGroup , Metric , Metrics } from '@libp2p/interface-metrics'
14
15
15
16
const log = logger ( 'libp2p:tcp:listener' )
16
17
@@ -31,6 +32,16 @@ interface Context extends TCPCreateListenerOptions {
31
32
socketInactivityTimeout ?: number
32
33
socketCloseTimeout ?: number
33
34
maxConnections ?: number
35
+ metrics ?: Metrics
36
+ }
37
+
38
+ const SERVER_STATUS_UP = 1
39
+ const SERVER_STATUS_DOWN = 0
40
+
41
+ export interface TCPListenerMetrics {
42
+ status : Metric
43
+ errors : CounterGroup
44
+ events : CounterGroup
34
45
}
35
46
36
47
type Status = { started : false } | { started : true , listeningAddr : Multiaddr , peerId : string | null }
@@ -39,8 +50,8 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
39
50
private readonly server : net . Server
40
51
/** Keep track of open connections to destroy in case of timeout */
41
52
private readonly connections = new Set < MultiaddrConnection > ( )
42
-
43
53
private status : Status = { started : false }
54
+ private metrics ?: TCPListenerMetrics
44
55
45
56
constructor ( private readonly context : Context ) {
46
57
super ( )
@@ -57,26 +68,75 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
57
68
}
58
69
59
70
this . server
60
- . on ( 'listening' , ( ) => this . dispatchEvent ( new CustomEvent ( 'listening' ) ) )
61
- . on ( 'error' , err => this . dispatchEvent ( new CustomEvent < Error > ( 'error' , { detail : err } ) ) )
62
- . on ( 'close' , ( ) => this . dispatchEvent ( new CustomEvent ( 'close' ) ) )
71
+ . on ( 'listening' , ( ) => {
72
+ if ( context . metrics != null ) {
73
+ // we are listening, register metrics for our port
74
+ const address = this . server . address ( )
75
+ let addr : string
76
+
77
+ if ( address == null ) {
78
+ addr = 'unknown'
79
+ } else if ( typeof address === 'string' ) {
80
+ // unix socket
81
+ addr = address
82
+ } else {
83
+ addr = `${ address . address } :${ address . port } `
84
+ }
85
+
86
+ context . metrics ?. registerMetric ( `libp2p_tcp_connections_${ addr } _count` , {
87
+ help : 'Current active connections in TCP listener' ,
88
+ calculate : ( ) => {
89
+ return this . connections . size
90
+ }
91
+ } )
92
+
93
+ this . metrics = {
94
+ status : context . metrics . registerMetric ( `libp2p_tcp_${ addr } _server_status` , {
95
+ help : 'Current status of the TCP server'
96
+ } ) ,
97
+ errors : context . metrics . registerCounterGroup ( `libp2p_tcp_${ addr } _server_errors_total` , {
98
+ label : 'error' ,
99
+ help : 'Total count of TCP listener errors by error type'
100
+ } ) ,
101
+ events : context . metrics . registerCounterGroup ( `libp2p_tcp_$${ addr } _socket_events` , {
102
+ label : 'event' ,
103
+ help : 'Total count of TCP socket events by event'
104
+ } )
105
+ }
106
+
107
+ this . metrics ?. status . update ( SERVER_STATUS_UP )
108
+ }
109
+
110
+ this . dispatchEvent ( new CustomEvent ( 'listening' ) )
111
+ } )
112
+ . on ( 'error' , err => {
113
+ this . metrics ?. errors . increment ( { listen_error : true } )
114
+ this . dispatchEvent ( new CustomEvent < Error > ( 'error' , { detail : err } ) )
115
+ } )
116
+ . on ( 'close' , ( ) => {
117
+ this . metrics ?. status . update ( SERVER_STATUS_DOWN )
118
+ this . dispatchEvent ( new CustomEvent ( 'close' ) )
119
+ } )
63
120
}
64
121
65
122
private onSocket ( socket : net . Socket ) {
66
123
// Avoid uncaught errors caused by unstable connections
67
124
socket . on ( 'error' , err => {
68
125
log ( 'socket error' , err )
126
+ this . metrics ?. events . increment ( { error : true } )
69
127
} )
70
128
71
129
let maConn : MultiaddrConnection
72
130
try {
73
131
maConn = toMultiaddrConnection ( socket , {
74
132
listeningAddr : this . status . started ? this . status . listeningAddr : undefined ,
75
133
socketInactivityTimeout : this . context . socketInactivityTimeout ,
76
- socketCloseTimeout : this . context . socketCloseTimeout
134
+ socketCloseTimeout : this . context . socketCloseTimeout ,
135
+ metrics : this . metrics ?. events
77
136
} )
78
137
} catch ( err ) {
79
138
log . error ( 'inbound connection failed' , err )
139
+ this . metrics ?. errors . increment ( { inbound_to_connection : true } )
80
140
return
81
141
}
82
142
@@ -99,6 +159,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
99
159
} )
100
160
. catch ( async err => {
101
161
log . error ( 'inbound connection failed' , err )
162
+ this . metrics ?. errors . increment ( { inbound_upgrade : true } )
102
163
103
164
await attemptClose ( maConn )
104
165
} )
@@ -111,6 +172,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
111
172
attemptClose ( maConn )
112
173
. catch ( err => {
113
174
log . error ( 'closing inbound connection failed' , err )
175
+ this . metrics ?. errors . increment ( { inbound_closing_failed : true } )
114
176
} )
115
177
}
116
178
}
0 commit comments