Skip to content

Commit 528d737

Browse files
authored
fix: replace p-queue with less restrictive queue (#2339)
Adds a `Queue` class to `@libp2p/utils` modelled on p-queue with a few key differences: 1. The queue is externally accessible so we can modify it before jobs run 2. It can be turned into an async generator 3. Jobs remain in the queue while they are executing for better introspection 4. It integrates with libp2p metrics, if desired The dial queue has been replaced with this new queue class, this means we don't need to maintain a separate internal queue for pending dials since the dial queue itself is accessible.
1 parent 581574d commit 528d737

File tree

19 files changed

+1469
-553
lines changed

19 files changed

+1469
-553
lines changed

packages/interface/src/errors.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@ export class CodeError<T extends Record<string, any> = Record<string, never>> ex
3333
}
3434
}
3535

36+
export class AggregateCodeError<T extends Record<string, any> = Record<string, never>> extends AggregateError {
37+
public readonly props: T
38+
39+
constructor (
40+
errors: Error[],
41+
message: string,
42+
public readonly code: string,
43+
props?: T
44+
) {
45+
super(errors, message)
46+
47+
this.name = props?.name ?? 'AggregateCodeError'
48+
this.props = props ?? {} as T // eslint-disable-line @typescript-eslint/consistent-type-assertions
49+
}
50+
}
51+
3652
export class UnexpectedPeerError extends Error {
3753
public code: string
3854

packages/interface/src/event-target.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export class TypedEventEmitter<EventMap extends Record<string, any>> extends Eve
9090
return result
9191
}
9292

93-
safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail>): boolean {
93+
safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail> = {}): boolean {
9494
return this.dispatchEvent(new CustomEvent<Detail>(type as string, detail))
9595
}
9696
}

packages/kad-dht/src/routing-table/index.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { CodeError, TypedEventEmitter } from '@libp2p/interface'
22
import { PeerSet } from '@libp2p/peer-collections'
3-
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
3+
import { PeerQueue } from '@libp2p/utils/peer-queue'
44
import { pbStream } from 'it-protobuf-stream'
55
import { Message, MessageType } from '../message/dht.js'
66
import * as utils from '../utils.js'
@@ -44,7 +44,7 @@ export interface RoutingTableEvents {
4444
export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implements Startable {
4545
public kBucketSize: number
4646
public kb?: KBucket
47-
public pingQueue: PeerJobQueue
47+
public pingQueue: PeerQueue<boolean>
4848

4949
private readonly log: Logger
5050
private readonly components: RoutingTableComponents
@@ -56,8 +56,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
5656
private readonly tagValue: number
5757
private readonly metrics?: {
5858
routingTableSize: Metric
59-
pingQueueSize: Metric
60-
pingRunning: Metric
6159
}
6260

6361
constructor (components: RoutingTableComponents, init: RoutingTableInit) {
@@ -75,23 +73,18 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
7573
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
7674
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE
7775

78-
const updatePingQueueSizeMetric = (): void => {
79-
this.metrics?.pingQueueSize.update(this.pingQueue.size)
80-
this.metrics?.pingRunning.update(this.pingQueue.pending)
81-
}
82-
83-
this.pingQueue = new PeerJobQueue({ concurrency: this.pingConcurrency })
84-
this.pingQueue.addListener('add', updatePingQueueSizeMetric)
85-
this.pingQueue.addListener('next', updatePingQueueSizeMetric)
86-
this.pingQueue.addListener('error', err => {
87-
this.log.error('error pinging peer', err)
76+
this.pingQueue = new PeerQueue({
77+
concurrency: this.pingConcurrency,
78+
metricName: `${logPrefix.replaceAll(':', '_')}_ping_queue`,
79+
metrics: this.components.metrics
80+
})
81+
this.pingQueue.addEventListener('error', evt => {
82+
this.log.error('error pinging peer', evt.detail)
8883
})
8984

9085
if (this.components.metrics != null) {
9186
this.metrics = {
92-
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`),
93-
pingQueueSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_queue_size`),
94-
pingRunning: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_running`)
87+
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`)
9588
}
9689
}
9790
}
@@ -204,8 +197,10 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
204197
const results = await Promise.all(
205198
oldContacts.map(async oldContact => {
206199
// if a previous ping wants us to ping this contact, re-use the result
207-
if (this.pingQueue.hasJob(oldContact.peer)) {
208-
return this.pingQueue.joinJob(oldContact.peer)
200+
const pingJob = this.pingQueue.find(oldContact.peer)
201+
202+
if (pingJob != null) {
203+
return pingJob.join()
209204
}
210205

211206
return this.pingQueue.add(async () => {

packages/libp2p/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@
111111
"merge-options": "^3.0.4",
112112
"multiformats": "^13.0.0",
113113
"p-defer": "^4.0.0",
114-
"p-queue": "^8.0.0",
115114
"private-ip": "^3.0.1",
116115
"rate-limiter-flexible": "^4.0.0",
117116
"uint8arraylist": "^2.4.3",

packages/libp2p/src/connection-manager/auto-dial.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
2-
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
2+
import { PeerQueue } from '@libp2p/utils/peer-queue'
33
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
44
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
5-
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable } from '@libp2p/interface'
5+
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface'
66
import type { ConnectionManager } from '@libp2p/interface-internal'
77

88
interface AutoDialInit {
@@ -20,6 +20,7 @@ interface AutoDialComponents {
2020
peerStore: PeerStore
2121
events: TypedEventTarget<Libp2pEvents>
2222
logger: ComponentLogger
23+
metrics?: Metrics
2324
}
2425

2526
const defaultOptions = {
@@ -35,7 +36,7 @@ const defaultOptions = {
3536
export class AutoDial implements Startable {
3637
private readonly connectionManager: ConnectionManager
3738
private readonly peerStore: PeerStore
38-
private readonly queue: PeerJobQueue
39+
private readonly queue: PeerQueue<void>
3940
private readonly minConnections: number
4041
private readonly autoDialPriority: number
4142
private readonly autoDialIntervalMs: number
@@ -64,11 +65,13 @@ export class AutoDial implements Startable {
6465
this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial')
6566
this.started = false
6667
this.running = false
67-
this.queue = new PeerJobQueue({
68-
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency
68+
this.queue = new PeerQueue({
69+
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency,
70+
metricName: 'libp2p_autodial_queue',
71+
metrics: components.metrics
6972
})
70-
this.queue.addListener('error', (err) => {
71-
this.log.error('error during auto-dial', err)
73+
this.queue.addEventListener('error', (evt) => {
74+
this.log.error('error during auto-dial', evt.detail)
7275
})
7376

7477
// check the min connection limit whenever a peer disconnects
@@ -179,7 +182,7 @@ export class AutoDial implements Startable {
179182
}
180183

181184
// remove peers already in the autodial queue
182-
if (this.queue.hasJob(peer.id)) {
185+
if (this.queue.has(peer.id)) {
183186
this.log.trace('not autodialing %p because they are already being autodialed', peer.id)
184187
return false
185188
}

0 commit comments

Comments
 (0)