Skip to content

Commit 9eff7ef

Browse files
authored
feat: allow joining jobs in peer queues (#2316)
When we have a PeerJobQueue, the jobs are usually expensive but the result is transferable - opening a connection, etc. This PR adds a `joinJob` method to `PeerJobQueue` - if a job is queued for the passed `PeerId`, a promise is returned that resolves/rejects based on the outcome of the job.
1 parent f81be14 commit 9eff7ef

File tree

4 files changed

+132
-11
lines changed

4 files changed

+132
-11
lines changed

packages/utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
"dependencies": {
113113
"@chainsafe/is-ip": "^2.0.2",
114114
"@libp2p/interface": "^1.0.2",
115+
"@libp2p/peer-collections": "^5.1.1",
115116
"@multiformats/multiaddr": "^12.1.10",
116117
"@multiformats/multiaddr-matcher": "^1.1.0",
117118
"get-iterator": "^2.0.1",

packages/utils/src/peer-job-queue.ts

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
/* eslint-disable @typescript-eslint/no-non-null-assertion */
22

33
import { CodeError, ERR_INVALID_PARAMETERS } from '@libp2p/interface'
4+
import { PeerMap } from '@libp2p/peer-collections'
5+
import pDefer from 'p-defer'
46
import PQueue from 'p-queue'
57
import type { PeerId } from '@libp2p/interface'
8+
import type { AbortOptions } from 'it-pushable'
9+
import type { DeferredPromise } from 'p-defer'
610
import type { QueueAddOptions, Options, Queue } from 'p-queue'
711

812
// Port of lower_bound from https://en.cppreference.com/w/cpp/algorithm/lower_bound
@@ -26,7 +30,9 @@ function lowerBound<T> (array: readonly T[], value: T, comparator: (a: T, b: T)
2630
return first
2731
}
2832

29-
interface RunFunction { (): Promise<unknown> }
33+
interface RunFunction<T> {
34+
(options?: AbortOptions): Promise<T>
35+
}
3036

3137
export interface PeerPriorityQueueOptions extends QueueAddOptions {
3238
peerId: PeerId
@@ -35,17 +41,17 @@ export interface PeerPriorityQueueOptions extends QueueAddOptions {
3541
interface PeerJob {
3642
priority: number
3743
peerId: PeerId
38-
run: RunFunction
44+
run: RunFunction<any>
3945
}
4046

4147
/**
4248
* Port of https://github.com/sindresorhus/p-queue/blob/main/source/priority-queue.ts
4349
* that adds support for filtering jobs by peer id
4450
*/
45-
class PeerPriorityQueue implements Queue<RunFunction, PeerPriorityQueueOptions> {
51+
class PeerPriorityQueue implements Queue<RunFunction<unknown>, PeerPriorityQueueOptions> {
4652
readonly #queue: PeerJob[] = []
4753

48-
enqueue (run: RunFunction, options?: Partial<PeerPriorityQueueOptions>): void {
54+
enqueue (run: RunFunction<unknown>, options?: Partial<PeerPriorityQueueOptions>): void {
4955
const peerId = options?.peerId
5056
const priority = options?.priority ?? 0
5157

@@ -71,23 +77,23 @@ class PeerPriorityQueue implements Queue<RunFunction, PeerPriorityQueueOptions>
7177
this.#queue.splice(index, 0, element)
7278
}
7379

74-
dequeue (): RunFunction | undefined {
80+
dequeue (): RunFunction<unknown> | undefined {
7581
const item = this.#queue.shift()
7682
return item?.run
7783
}
7884

79-
filter (options: Readonly<Partial<PeerPriorityQueueOptions>>): RunFunction[] {
85+
filter (options: Readonly<Partial<PeerPriorityQueueOptions>>): Array<RunFunction<unknown>> {
8086
if (options.peerId != null) {
8187
const peerId = options.peerId
8288

8389
return this.#queue.filter(
8490
(element: Readonly<PeerPriorityQueueOptions>) => peerId.equals(element.peerId)
85-
).map((element: Readonly<{ run: RunFunction }>) => element.run)
91+
).map((element: Readonly<{ run: RunFunction<unknown> }>) => element.run)
8692
}
8793

8894
return this.#queue.filter(
8995
(element: Readonly<PeerPriorityQueueOptions>) => element.priority === options.priority
90-
).map((element: Readonly<{ run: RunFunction }>) => element.run)
96+
).map((element: Readonly<{ run: RunFunction<unknown> }>) => element.run)
9197
}
9298

9399
get size (): number {
@@ -99,20 +105,78 @@ class PeerPriorityQueue implements Queue<RunFunction, PeerPriorityQueueOptions>
99105
* Extends PQueue to add support for querying queued jobs by peer id
100106
*/
101107
export class PeerJobQueue extends PQueue<PeerPriorityQueue, PeerPriorityQueueOptions> {
108+
private readonly results: PeerMap<DeferredPromise<any> | true>
109+
102110
constructor (options: Options<PeerPriorityQueue, PeerPriorityQueueOptions> = {}) {
103111
super({
104112
...options,
105113
queueClass: PeerPriorityQueue
106114
})
115+
116+
this.results = new PeerMap()
107117
}
108118

109119
/**
110-
* Returns true if this queue has a job for the passed peer id that has not yet
111-
* started to run
120+
* Returns true if this queue has a job for the passed peer id that has not
121+
* yet started to run
112122
*/
113123
hasJob (peerId: PeerId): boolean {
114124
return this.sizeBy({
115125
peerId
116126
}) > 0
117127
}
128+
129+
/**
130+
* Returns a promise for the result of the job in the queue for the passed
131+
* peer id.
132+
*/
133+
async joinJob <Result = void> (peerId: PeerId): Promise<Result> {
134+
let deferred = this.results.get(peerId)
135+
136+
if (deferred == null) {
137+
throw new CodeError('No job found for peer id', 'ERR_NO_JOB_FOR_PEER_ID')
138+
}
139+
140+
if (deferred === true) {
141+
// a job has been added but so far nothing has tried to join the job
142+
deferred = pDefer<Result>()
143+
this.results.set(peerId, deferred)
144+
}
145+
146+
return deferred.promise
147+
}
148+
149+
async add <T> (fn: RunFunction<T>, opts: PeerPriorityQueueOptions): Promise<T> {
150+
const peerId = opts?.peerId
151+
152+
if (peerId == null) {
153+
throw new CodeError('missing peer id', ERR_INVALID_PARAMETERS)
154+
}
155+
156+
this.results.set(opts.peerId, true)
157+
158+
return super.add(async (opts?: AbortOptions) => {
159+
try {
160+
const value = await fn(opts)
161+
162+
const deferred = this.results.get(peerId)
163+
164+
if (deferred != null && deferred !== true) {
165+
deferred.resolve(value)
166+
}
167+
168+
return value
169+
} catch (err) {
170+
const deferred = this.results.get(peerId)
171+
172+
if (deferred != null && deferred !== true) {
173+
deferred.reject(err)
174+
}
175+
176+
throw err
177+
} finally {
178+
this.results.delete(peerId)
179+
}
180+
}, opts) as Promise<T>
181+
}
118182
}

packages/utils/src/stream-to-ma-conn.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect
4444
// If the source errored the socket will already have been destroyed by
4545
// toIterable.duplex(). If the socket errored it will already be
4646
// destroyed. There's nothing to do here except log the error & return.
47-
log(err)
47+
log.error('%s error in sink', remoteAddr, err)
4848
}
4949
} finally {
5050
closedWrite = true

packages/utils/test/peer-job-queue.spec.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,60 @@ describe('peer job queue', () => {
3737

3838
expect(queue.hasJob(peerIdA)).to.be.false()
3939
})
40+
41+
it('can join existing jobs', async () => {
42+
const value = 'hello world'
43+
const deferred = pDefer<string>()
44+
45+
const peerIdA = await createEd25519PeerId()
46+
const queue = new PeerJobQueue({
47+
concurrency: 1
48+
})
49+
50+
expect(queue.hasJob(peerIdA)).to.be.false()
51+
52+
await expect(queue.joinJob(peerIdA)).to.eventually.rejected
53+
.with.property('code', 'ERR_NO_JOB_FOR_PEER_ID')
54+
55+
void queue.add(async () => {
56+
return deferred.promise
57+
}, {
58+
peerId: peerIdA
59+
})
60+
61+
const join = queue.joinJob<string>(peerIdA)
62+
63+
deferred.resolve(value)
64+
65+
await expect(join).to.eventually.equal(value)
66+
67+
expect(queue.hasJob(peerIdA)).to.be.false()
68+
69+
await expect(queue.joinJob(peerIdA)).to.eventually.rejected
70+
.with.property('code', 'ERR_NO_JOB_FOR_PEER_ID')
71+
})
72+
73+
it('can join an existing job that fails', async () => {
74+
const error = new Error('nope!')
75+
const deferred = pDefer<string>()
76+
77+
const peerIdA = await createEd25519PeerId()
78+
const queue = new PeerJobQueue({
79+
concurrency: 1
80+
})
81+
82+
void queue.add(async () => {
83+
return deferred.promise
84+
}, {
85+
peerId: peerIdA
86+
})
87+
.catch(() => {})
88+
89+
const joinedJob = queue.joinJob(peerIdA)
90+
91+
deferred.reject(error)
92+
93+
await expect(joinedJob).to.eventually.rejected
94+
.with.property('message', error.message)
95+
})
4096
})

0 commit comments

Comments
 (0)