Skip to content

Commit 4fd7eb2

Browse files
authored
feat: add capability detection to metrics-devtools (#2708)
Adds the ability to find configured services that possess a given capability and interact with that service via rpc. Starts with a configured PubSub service.
1 parent 6ccbb06 commit 4fd7eb2

File tree

7 files changed

+136
-27
lines changed

7 files changed

+136
-27
lines changed

packages/metrics-devtools/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
"doc-check": "aegir doc-check",
6565
"build": "aegir build",
6666
"test": "aegir test -t browser",
67-
"test:chrome": "aegir test -t browser --cov"
67+
"test:chrome": "aegir test -t browser --cov",
68+
"test:firefox": "aegir test -t browser --browser firefox"
6869
},
6970
"dependencies": {
7071
"@libp2p/interface": "^2.0.1",

packages/metrics-devtools/src/index.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* for Chrome or Firefox to inspect the state of your running node.
1717
*/
1818

19-
import { serviceCapabilities, start, stop } from '@libp2p/interface'
19+
import { isPubSub, serviceCapabilities, start, stop } from '@libp2p/interface'
2020
import { simpleMetrics } from '@libp2p/simple-metrics'
2121
import { pipe } from 'it-pipe'
2222
import { pushable } from 'it-pushable'
@@ -25,10 +25,11 @@ import { base64 } from 'multiformats/bases/base64'
2525
import { valueCodecs } from './rpc/index.js'
2626
import { metricsRpc } from './rpc/rpc.js'
2727
import { debounce } from './utils/debounce.js'
28+
import { findCapability } from './utils/find-capability.js'
2829
import { getPeers } from './utils/get-peers.js'
2930
import { getSelf } from './utils/get-self.js'
3031
import type { DevToolsRPC } from './rpc/index.js'
31-
import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable } from '@libp2p/interface'
32+
import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable, Message, SubscriptionChangeData } from '@libp2p/interface'
3233
import type { TransportManager, Registrar, ConnectionManager, AddressManager } from '@libp2p/interface-internal'
3334
import type { Pushable } from 'it-pushable'
3435

@@ -175,6 +176,10 @@ class DevToolsMetrics implements Metrics, Startable {
175176
this.onSelfUpdate = debounce(this.onSelfUpdate.bind(this), 1000)
176177
this.onIncomingMessage = this.onIncomingMessage.bind(this)
177178

179+
// relay pubsub messages to dev tools panel
180+
this.onPubSubMessage = this.onPubSubMessage.bind(this)
181+
this.onPubSubSubscriptionChange = this.onPubSubSubscriptionChange.bind(this)
182+
178183
// collect metrics
179184
this.simpleMetrics = simpleMetrics({
180185
intervalMs: this.intervalMs,
@@ -257,6 +262,13 @@ class DevToolsMetrics implements Metrics, Startable {
257262
.catch(err => {
258263
this.log.error('error while reading RPC messages', err)
259264
})
265+
266+
const pubsub = findCapability('@libp2p/pubsub', this.components)
267+
268+
if (isPubSub(pubsub)) {
269+
pubsub.addEventListener('message', this.onPubSubMessage)
270+
pubsub.addEventListener('subscription-change', this.onPubSubSubscriptionChange)
271+
}
260272
}
261273

262274
async stop (): Promise<void> {
@@ -287,6 +299,24 @@ class DevToolsMetrics implements Metrics, Startable {
287299
}
288300
}
289301

302+
private onPubSubMessage (event: CustomEvent<Message>): void {
303+
this.devTools.safeDispatchEvent('pubsub:message', {
304+
detail: event.detail
305+
})
306+
.catch(err => {
307+
this.log.error('error relaying pubsub message', err)
308+
})
309+
}
310+
311+
private onPubSubSubscriptionChange (event: CustomEvent<SubscriptionChangeData>): void {
312+
this.devTools.safeDispatchEvent('pubsub:subscription-change', {
313+
detail: event.detail
314+
})
315+
.catch(err => {
316+
this.log.error('error relaying pubsub subscription change', err)
317+
})
318+
}
319+
290320
private onSelfUpdate (): void {
291321
Promise.resolve()
292322
.then(async () => {

packages/metrics-devtools/src/rpc/index.ts

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { cidCodec } from './codecs/cid.js'
22
import { customProgressEventCodec } from './codecs/custom-progress-event.js'
33
import { multiaddrCodec } from './codecs/multiaddr.js'
44
import { peerIdCodec } from './codecs/peer-id.js'
5-
import type { ContentRouting, PeerId, PeerRouting, AbortOptions } from '@libp2p/interface'
5+
import type { ContentRouting, PeerId, PeerRouting, AbortOptions, PubSubRPCMessage, SubscriptionChangeData } from '@libp2p/interface'
66
import type { OpenConnectionOptions } from '@libp2p/interface-internal'
77
import type { Multiaddr } from '@multiformats/multiaddr'
88
import type { ValueCodec } from 'it-rpc'
@@ -14,21 +14,6 @@ export const valueCodecs: Array<ValueCodec<any>> = [
1414
customProgressEventCodec
1515
]
1616

17-
export interface NodeAddress {
18-
multiaddr: Multiaddr
19-
listen?: boolean
20-
announce?: boolean
21-
observed?: boolean
22-
default?: boolean
23-
}
24-
25-
export interface NodeStatus {
26-
peerId: PeerId
27-
agent?: string
28-
addresses: NodeAddress[]
29-
protocols: string[]
30-
}
31-
3217
export interface PeerAddress {
3318
multiaddr: Multiaddr
3419
isConnected?: boolean
@@ -69,7 +54,7 @@ export interface MetricsRPC {
6954
/**
7055
* Called by DevTools on initial connect
7156
*/
72-
init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string }>
57+
init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string, capabilities: Record<string, string[]> }>
7358

7459
/**
7560
* Update the currently active debugging namespaces
@@ -95,6 +80,36 @@ export interface MetricsRPC {
9580
* Make peer routing queries
9681
*/
9782
peerRouting: PeerRouting
83+
84+
/**
85+
* PubSub operations
86+
*/
87+
pubsub: {
88+
/**
89+
* Subscribe to a PubSub topic
90+
*/
91+
subscribe(component: string, topic: string): Promise<void>
92+
93+
/**
94+
* Unsubscribe from a PubSub topic
95+
*/
96+
unsubscribe(component: string, topic: string): Promise<void>
97+
98+
/**
99+
* Get the list of subscriptions for the current node
100+
*/
101+
getTopics (component: string): Promise<string[]>
102+
103+
/**
104+
* Get the list of peers we know about who subscribe to the topic
105+
*/
106+
getSubscribers (component: string, topic: string): Promise<PeerId[]>
107+
108+
/**
109+
* Publish a message to a given topic
110+
*/
111+
publish (component: string, topic: string, message: Uint8Array): Promise<void>
112+
}
98113
}
99114

100115
export interface DevToolsEvents {
@@ -112,6 +127,16 @@ export interface DevToolsEvents {
112127
* The node's connected peers have changed
113128
*/
114129
'peers': CustomEvent<Peer[]>
130+
131+
/**
132+
* A pubsub message was received
133+
*/
134+
'pubsub:message': CustomEvent<PubSubRPCMessage>
135+
136+
/**
137+
* The subscriptions of a peer have changed
138+
*/
139+
'pubsub:subscription-change': CustomEvent<SubscriptionChangeData>
115140
}
116141

117142
/**

packages/metrics-devtools/src/rpc/rpc.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { enable, disable } from '@libp2p/logger'
22
import { peerIdFromString } from '@libp2p/peer-id'
33
import { multiaddr } from '@multiformats/multiaddr'
4+
import { gatherCapabilities } from '../utils/gather-capabilities.js'
45
import { getPeers } from '../utils/get-peers.js'
6+
import { getPubSub } from '../utils/get-pubsub.js'
57
import { getSelf } from '../utils/get-self.js'
68
import type { MetricsRPC } from './index.js'
79
import type { DevToolsMetricsComponents } from '../index.js'
810
import type { PeerId } from '@libp2p/interface'
9-
import type { OpenConnectionOptions } from '@libp2p/interface-internal'
1011
import type { Multiaddr } from '@multiformats/multiaddr'
11-
import type { AbortOptions } from 'it-pushable'
1212

1313
export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
1414
const log = components.logger.forComponent('libp2p:devtools-metrics:metrics-rpc')
@@ -18,10 +18,11 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
1818
return {
1919
self: await getSelf(components),
2020
peers: await getPeers(components, log),
21-
debug: localStorage.getItem('debug') ?? ''
21+
debug: localStorage.getItem('debug') ?? '',
22+
capabilities: gatherCapabilities(components)
2223
}
2324
},
24-
setDebug: async (namespace?: string) => {
25+
setDebug: async (namespace?) => {
2526
if (namespace?.length != null && namespace?.length > 0) {
2627
enable(namespace)
2728
localStorage.setItem('debug', namespace)
@@ -30,7 +31,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
3031
localStorage.removeItem('debug')
3132
}
3233
},
33-
openConnection: async (peerIdOrMultiaddr: string, options?: OpenConnectionOptions) => {
34+
openConnection: async (peerIdOrMultiaddr, options?) => {
3435
let peer: PeerId | Multiaddr
3536

3637
try {
@@ -41,7 +42,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
4142

4243
await components.connectionManager.openConnection(peer, options)
4344
},
44-
closeConnection: async (peerId: PeerId, options?: AbortOptions) => {
45+
closeConnection: async (peerId, options?) => {
4546
await Promise.all(
4647
components.connectionManager.getConnections(peerId)
4748
.map(async connection => {
@@ -54,6 +55,23 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
5455
)
5556
},
5657
contentRouting: components.contentRouting,
57-
peerRouting: components.peerRouting
58+
peerRouting: components.peerRouting,
59+
pubsub: {
60+
async getTopics (component) {
61+
return getPubSub(component, components).getTopics()
62+
},
63+
async subscribe (component, topic) {
64+
getPubSub(component, components).subscribe(topic)
65+
},
66+
async unsubscribe (component, topic) {
67+
getPubSub(component, components).unsubscribe(topic)
68+
},
69+
async publish (component, topic, message) {
70+
await getPubSub(component, components).publish(topic, message)
71+
},
72+
async getSubscribers (component: string, topic: string) {
73+
return getPubSub(component, components).getSubscribers(topic)
74+
}
75+
}
5876
}
5977
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { gatherCapabilities } from './gather-capabilities.js'
2+
3+
export function findCapability (capability: string, components: any): any | undefined {
4+
for (const [name, capabilities] of Object.entries(gatherCapabilities(components))) {
5+
if (capabilities.includes(capability)) {
6+
return components[name]
7+
}
8+
}
9+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { serviceCapabilities } from '@libp2p/interface'
2+
3+
export function gatherCapabilities (components: any): Record<string, string[]> {
4+
const capabilities: Record<string, string[]> = {}
5+
const services: Record<string, any> = components.components ?? components
6+
7+
Object.entries(services).forEach(([name, component]) => {
8+
if (component?.[serviceCapabilities] != null && Array.isArray(component[serviceCapabilities])) {
9+
capabilities[name] = component[serviceCapabilities]
10+
}
11+
})
12+
13+
return capabilities
14+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { InvalidParametersError, isPubSub } from '@libp2p/interface'
2+
import type { PubSub } from '@libp2p/interface'
3+
4+
export function getPubSub (component: string, components: any): PubSub {
5+
const pubsub = components[component]
6+
7+
if (!isPubSub(pubsub)) {
8+
throw new InvalidParametersError(`Component ${component} did not implement the PubSub interface`)
9+
}
10+
11+
return pubsub
12+
}

0 commit comments

Comments
 (0)