Skip to content

Commit 89fc431

Browse files
committed
fix: close short-lived connections first when pruning by tag value (libp2p#1509)
1 parent 1b30f81 commit 89fc431

File tree

2 files changed

+110
-29
lines changed

2 files changed

+110
-29
lines changed

src/connection-manager/index.ts

+27-15
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1-
import { logger } from '@libp2p/logger'
2-
import errCode from 'err-code'
3-
import mergeOptions from 'merge-options'
4-
import { LatencyMonitor, SummaryObject } from './latency-monitor.js'
5-
import type { AbortOptions } from '@libp2p/interfaces'
6-
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
7-
import type { Startable } from '@libp2p/interfaces/startable'
8-
import { codes } from '../errors.js'
9-
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
10-
import { setMaxListeners } from 'events'
111
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
122
import type { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager'
133
import * as STATUS from '@libp2p/interface-connection/status'
4+
import type { Metrics } from '@libp2p/interface-metrics'
5+
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
146
import type { AddressSorter, PeerStore } from '@libp2p/interface-peer-store'
15-
import { isMultiaddr, multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
16-
import { PeerMap } from '@libp2p/peer-collections'
17-
import { TimeoutController } from 'timeout-abort-controller'
187
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
19-
import { RateLimiterMemory } from 'rate-limiter-flexible'
20-
import type { Metrics } from '@libp2p/interface-metrics'
218
import type { Upgrader } from '@libp2p/interface-transport'
9+
import type { AbortOptions } from '@libp2p/interfaces'
10+
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
11+
import type { Startable } from '@libp2p/interfaces/startable'
12+
import { logger } from '@libp2p/logger'
13+
import { PeerMap } from '@libp2p/peer-collections'
14+
import { isMultiaddr, multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
15+
import errCode from 'err-code'
16+
import { setMaxListeners } from 'events'
17+
import mergeOptions from 'merge-options'
18+
import { RateLimiterMemory } from 'rate-limiter-flexible'
19+
import { TimeoutController } from 'timeout-abort-controller'
20+
import { codes } from '../errors.js'
2221
import { getPeer } from '../get-peer.js'
22+
import { LatencyMonitor, SummaryObject } from './latency-monitor.js'
2323

2424
const log = logger('libp2p:connection-manager')
2525

@@ -650,6 +650,18 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
650650
return -1
651651
}
652652

653+
// if the peers have an equal tag value then we want to close short-lived connections first
654+
const connectionALifespan = a.stat.timeline.open
655+
const connectionBLifespan = b.stat.timeline.open
656+
657+
if (connectionALifespan > connectionBLifespan) {
658+
return 1
659+
}
660+
661+
if (connectionALifespan < connectionBLifespan) {
662+
return -1
663+
}
664+
653665
return 0
654666
})
655667

test/connection-manager/index.spec.ts

+83-14
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
/* eslint-env mocha */
22

3-
import { expect } from 'aegir/chai'
4-
import sinon from 'sinon'
5-
import { createNode } from '../utils/creators/peer.js'
6-
import { createBaseOptions } from '../utils/base-options.browser.js'
7-
import type { Libp2pNode } from '../../src/libp2p.js'
8-
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
9-
import { mockConnection, mockDuplex, mockMultiaddrConnection, mockMetrics } from '@libp2p/interface-mocks'
10-
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
11-
import { CustomEvent } from '@libp2p/interfaces/events'
3+
import type { Connection } from '@libp2p/interface-connection'
4+
import type { Dialer } from '@libp2p/interface-connection-manager'
5+
import { mockConnection, mockDuplex, mockMetrics, mockMultiaddrConnection } from '@libp2p/interface-mocks'
6+
import type { PeerStore } from '@libp2p/interface-peer-store'
127
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
13-
import pWaitFor from 'p-wait-for'
8+
import type { Upgrader } from '@libp2p/interface-transport'
9+
import { CustomEvent } from '@libp2p/interfaces/events'
10+
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
1411
import { multiaddr } from '@multiformats/multiaddr'
12+
import { expect } from 'aegir/chai'
13+
import pWaitFor from 'p-wait-for'
14+
import sinon from 'sinon'
1515
import { stubInterface } from 'sinon-ts'
16-
import type { Dialer } from '@libp2p/interface-connection-manager'
17-
import type { Connection } from '@libp2p/interface-connection'
18-
import type { Upgrader } from '@libp2p/interface-transport'
19-
import type { PeerStore } from '@libp2p/interface-peer-store'
16+
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
17+
import type { Libp2pNode } from '../../src/libp2p.js'
18+
import { createBaseOptions } from '../utils/base-options.browser.js'
19+
import { createNode } from '../utils/creators/peer.js'
2020

2121
const defaultOptions = {
2222
maxConnections: 10,
@@ -114,6 +114,75 @@ describe('Connection Manager', () => {
114114
expect(lowestSpy).to.have.property('callCount', 1)
115115
})
116116

117+
it('should close shortest-lived connection if the tag values are equal', async () => {
118+
const max = 5
119+
libp2p = await createNode({
120+
config: createBaseOptions({
121+
connectionManager: {
122+
maxConnections: max,
123+
minConnections: 2
124+
}
125+
}),
126+
started: false
127+
})
128+
129+
await libp2p.start()
130+
131+
const connectionManager = libp2p.connectionManager as DefaultConnectionManager
132+
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_pruneConnections')
133+
const spies = new Map<number, sinon.SinonSpy<[], Promise<void>>>()
134+
135+
const createConnection = async (value: number) => {
136+
// #TODO: Mock the connection timeline to simulate an older connection
137+
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
138+
const spy = sinon.spy(connection, 'close')
139+
140+
// The lowest tag value will have the longest connection
141+
spies.set(value, spy)
142+
await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', {
143+
value
144+
})
145+
146+
await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
147+
}
148+
149+
for (let i = 0; i < max; i++) {
150+
const value = i * 10
151+
await createConnection(value)
152+
}
153+
154+
// The lowest tag value will have the longest connection
155+
const value = 0 * 10
156+
157+
// Add 1 connection too many with the lowest tag value
158+
await createConnection(value)
159+
160+
// Sort the spy keys
161+
const sortedSpyKeys = Array.from(spies.keys()).sort((a, b) => {
162+
if (a > b) {
163+
return 1
164+
}
165+
166+
if (a < b) {
167+
return -1
168+
}
169+
170+
return 0
171+
})
172+
173+
// get the lowest tagged value, but this would be also the longest lived connection
174+
const lowestTag = sortedSpyKeys[0]
175+
const lowestTagSpy = spies.get(lowestTag)
176+
177+
// Get shortest-lived connection with the lowest tag
178+
const shortestLivedWithLowestTag = sortedSpyKeys[1]
179+
const shortestLivedSpy = spies.get(shortestLivedWithLowestTag)
180+
181+
expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1)
182+
expect(lowestTagSpy).to.have.property('callCount', 0)
183+
expect(shortestLivedSpy).to.have.property('callCount', 1)
184+
})
185+
117186
it('should close connection when the maximum has been reached even without tags', async () => {
118187
const max = 5
119188
libp2p = await createNode({

0 commit comments

Comments
 (0)