Skip to content

Commit 16a8707

Browse files
authored
chore: fix pubsub interop tests (#2191)
When opening outbound streams, only make sure no outbound pubsub streams exist on the connection, not just any pubsub streams. Fixes a race condition where the remote peer can open streams before us which then prevents us opening streams.
1 parent 3bdaad3 commit 16a8707

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

packages/libp2p/test/interop.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> {
129129
},
130130
transports: [tcp(), circuitRelayTransport()],
131131
streamMuxers: [],
132-
connectionEncryption: [noise()]
132+
connectionEncryption: [noise()],
133+
connectionManager: {
134+
minConnections: 0
135+
}
133136
}
134137

135138
const services: ServiceFactoryMap = {

packages/pubsub/src/index.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
214214
log('connected %p', peerId)
215215

216216
// if this connection is already in use for pubsub, ignore it
217-
if (conn.streams.find(stream => stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) {
217+
if (conn.streams.find(stream => stream.direction === 'outbound' && stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) {
218+
log('outbound pubsub streams already present on connection from %p', peerId)
218219
return
219220
}
220221

@@ -533,8 +534,14 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
533534
sendRpc (peer: PeerId, rpc: PubSubRPC): void {
534535
const peerStreams = this.peers.get(peer)
535536

536-
if (peerStreams == null || !peerStreams.isWritable) {
537-
log.error('Cannot send RPC to %p as there is no open stream to it available', peer)
537+
if (peerStreams == null) {
538+
log.error('Cannot send RPC to %p as there are no streams to it available', peer)
539+
540+
return
541+
}
542+
543+
if (!peerStreams.isWritable) {
544+
log.error('Cannot send RPC to %p as there is no outbound stream to it available', peer)
538545

539546
return
540547
}

0 commit comments

Comments
 (0)