Skip to content

Commit bef1927

Browse files
authored
fix(libp2p): registrar topology notification fixes
1 parent 539fdc2 commit bef1927

File tree

1 file changed

+36
-11
lines changed

1 file changed

+36
-11
lines changed

packages/libp2p/src/registrar.ts

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { PeerStore } from '@libp2p/interface/peer-store'
99
import type { Topology } from '@libp2p/interface/topology'
1010
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
1111
import type { StreamHandlerOptions, StreamHandlerRecord, Registrar, StreamHandler } from '@libp2p/interface-internal/registrar'
12+
import { PeerSet } from '@libp2p/peer-collections'
1213

1314
const log = logger('libp2p:registrar')
1415

@@ -26,11 +27,13 @@ export interface RegistrarComponents {
2627
* Responsible for notifying registered protocols of events in the network.
2728
*/
2829
export class DefaultRegistrar implements Registrar {
30+
private readonly topologyPeers: Map<string, PeerSet>
2931
private readonly topologies: Map<string, Map<string, Topology>>
3032
private readonly handlers: Map<string, StreamHandlerRecord>
3133
private readonly components: RegistrarComponents
3234

3335
constructor (components: RegistrarComponents) {
36+
this.topologyPeers = new Map()
3437
this.topologies = new Map()
3538
this.handlers = new Map()
3639
this.components = components
@@ -130,6 +133,7 @@ export class DefaultRegistrar implements Registrar {
130133
}
131134

132135
topologies.set(id, topology)
136+
this.topologyPeers.set(id, new PeerSet())
133137

134138
return id
135139
}
@@ -138,6 +142,7 @@ export class DefaultRegistrar implements Registrar {
138142
* Unregister topology
139143
*/
140144
unregister (id: string): void {
145+
this.topologyPeers.delete(id)
141146
for (const [protocol, topologies] of this.topologies.entries()) {
142147
if (topologies.has(id)) {
143148
topologies.delete(id)
@@ -185,9 +190,8 @@ export class DefaultRegistrar implements Registrar {
185190
*/
186191
_onPeerUpdate (evt: CustomEvent<PeerUpdate>): void {
187192
const { peer, previous } = evt.detail
188-
const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol))
189-
const added = peer.protocols.filter(protocol => !(previous?.protocols ?? []).includes(protocol))
190193

194+
const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol))
191195
for (const protocol of removed) {
192196
const topologies = this.topologies.get(protocol)
193197

@@ -196,26 +200,47 @@ export class DefaultRegistrar implements Registrar {
196200
continue
197201
}
198202

199-
for (const topology of topologies.values()) {
200-
topology.onDisconnect?.(peer.id)
203+
for (const [id, topology] of topologies.entries()) {
204+
const peers = this.topologyPeers.get(id)
205+
if (peers == null) {
206+
continue
207+
}
208+
209+
for (const peer of peers) {
210+
topology.onDisconnect?.(peer)
211+
}
212+
213+
peers.clear()
201214
}
202215
}
203216

204-
for (const protocol of added) {
217+
const connections = this.components.connectionManager.getConnections(peer.id)
218+
if (connections.length === 0) {
219+
return
220+
}
221+
222+
const nonTransientConnection = connections.find((connection) => connection.transient === false)
223+
for (const protocol of peer.protocols) {
205224
const topologies = this.topologies.get(protocol)
206225

207226
if (topologies == null) {
208227
// no topologies are interested in this protocol
209228
continue
210229
}
211230

212-
for (const connection of this.components.connectionManager.getConnections(peer.id)) {
213-
for (const topology of topologies.values()) {
214-
if (connection.transient && topology.notifyOnTransient !== true) {
215-
continue
216-
}
231+
232+
for (const [id, topology] of topologies.entries()) {
233+
const peers = this.topologyPeers.get(id)
234+
if (peers?.has(peer.id)) {
235+
continue
236+
}
217237

218-
topology.onConnect?.(peer.id, connection)
238+
if (topology.notifyOnTransient) {
239+
topology.onConnect?.(peer.id, connections[0])
240+
peers?.add(peer.id)
241+
} else if (nonTransientConnection != null) {
242+
topology.onConnect?.(peer.id, nonTransientConnection)
243+
peers?.add(peer.id)
219244
}
220245
}
221246
}

0 commit comments

Comments
 (0)