Skip to content

Commit 657760f

Browse files
committed
fix: invoke onProgress callback with DHT queries during routing
Allow passing an `onProgress` callback to the peer/content routers that can receive DHT query events. Refs #1574
1 parent 6abcd22 commit 657760f

File tree

8 files changed

+182
-33
lines changed

8 files changed

+182
-33
lines changed

packages/interface/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
"it-stream-types": "^2.0.1",
164164
"multiformats": "^12.0.1",
165165
"p-defer": "^4.0.0",
166+
"progress-events": "^1.0.0",
166167
"uint8arraylist": "^2.4.3"
167168
},
168169
"devDependencies": {

packages/interface/src/content-routing/index.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AbortOptions } from '../index.js'
22
import type { PeerInfo } from '../peer-info/index.js'
33
import type { CID } from 'multiformats/cid'
4+
import type { ProgressEvent, ProgressOptions } from 'progress-events'
45

56
/**
67
* Any object that implements this Symbol as a property should return a
@@ -23,7 +24,12 @@ import type { CID } from 'multiformats/cid'
2324
*/
2425
export const contentRouting = Symbol.for('@libp2p/content-routing')
2526

26-
export interface ContentRouting {
27+
export interface ContentRouting<
28+
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
29+
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
30+
PutProgressEvents extends ProgressEvent = ProgressEvent,
31+
GetProgressEvents extends ProgressEvent = ProgressEvent
32+
> {
2733
/**
2834
* The implementation of this method should ensure that network peers know the
2935
* caller can provide content that corresponds to the passed CID.
@@ -35,7 +41,7 @@ export interface ContentRouting {
3541
* await contentRouting.provide(cid)
3642
* ```
3743
*/
38-
provide: (cid: CID, options?: AbortOptions) => Promise<void>
44+
provide: (cid: CID, options?: AbortOptions & ProgressOptions<ProvideProgressEvents>) => Promise<void>
3945

4046
/**
4147
* Find the providers of the passed CID.
@@ -49,7 +55,7 @@ export interface ContentRouting {
4955
* }
5056
* ```
5157
*/
52-
findProviders: (cid: CID, options?: AbortOptions) => AsyncIterable<PeerInfo>
58+
findProviders: (cid: CID, options?: AbortOptions & ProgressOptions<FindProvidersProgressEvents>) => AsyncIterable<PeerInfo>
5359

5460
/**
5561
* Puts a value corresponding to the passed key in a way that can later be
@@ -65,7 +71,7 @@ export interface ContentRouting {
6571
* await contentRouting.put(key, value)
6672
* ```
6773
*/
68-
put: (key: Uint8Array, value: Uint8Array, options?: AbortOptions) => Promise<void>
74+
put: (key: Uint8Array, value: Uint8Array, options?: AbortOptions & ProgressOptions<PutProgressEvents>) => Promise<void>
6975

7076
/**
7177
* Retrieves a value from the network corresponding to the passed key.
@@ -79,5 +85,5 @@ export interface ContentRouting {
7985
* const value = await contentRouting.get(key)
8086
* ```
8187
*/
82-
get: (key: Uint8Array, options?: AbortOptions) => Promise<Uint8Array>
88+
get: (key: Uint8Array, options?: AbortOptions & ProgressOptions<GetProgressEvents>) => Promise<Uint8Array>
8389
}

packages/interface/src/index.ts

+48-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import type { StreamHandler, StreamHandlerOptions } from './stream-handler/index
2828
import type { Topology } from './topology/index.js'
2929
import type { Listener } from './transport/index.js'
3030
import type { Multiaddr } from '@multiformats/multiaddr'
31+
import type { ProgressEvent } from 'progress-events'
3132

3233
/**
3334
* Used by the connection manager to sort addresses into order before dialling
@@ -108,7 +109,15 @@ export interface IdentifyResult {
108109
* Event names are `noun:verb` so the first part is the name of the object
109110
* being acted on and the second is the action.
110111
*/
111-
export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
112+
export interface Libp2pEvents<
113+
Services extends ServiceMap = ServiceMap,
114+
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
115+
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
116+
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
117+
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
118+
PutProgressEvents extends ProgressEvent = ProgressEvent,
119+
GetProgressEvents extends ProgressEvent = ProgressEvent
120+
> {
112121
/**
113122
* This event is dispatched when a new network peer is discovered.
114123
*
@@ -235,7 +244,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
235244
* })
236245
* ```
237246
*/
238-
'start': CustomEvent<Libp2p<T>>
247+
'start': CustomEvent<Libp2p<
248+
Services,
249+
FindPeerProgressEvents,
250+
GetClosestPeersProgressEvents,
251+
ProvideProgressEvents,
252+
FindProvidersProgressEvents,
253+
PutProgressEvents,
254+
GetProgressEvents
255+
>>
239256

240257
/**
241258
* This event notifies listeners that the node has stopped
@@ -246,7 +263,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
246263
* })
247264
* ```
248265
*/
249-
'stop': CustomEvent<Libp2p<T>>
266+
'stop': CustomEvent<Libp2p<
267+
Services,
268+
FindPeerProgressEvents,
269+
GetClosestPeersProgressEvents,
270+
ProvideProgressEvents,
271+
FindProvidersProgressEvents,
272+
PutProgressEvents,
273+
GetProgressEvents
274+
>>
250275
}
251276

252277
/**
@@ -303,7 +328,23 @@ export interface PendingDial {
303328
/**
304329
* Libp2p nodes implement this interface.
305330
*/
306-
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, EventEmitter<Libp2pEvents<T>> {
331+
export interface Libp2p<
332+
Services extends ServiceMap = ServiceMap,
333+
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
334+
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
335+
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
336+
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
337+
PutProgressEvents extends ProgressEvent = ProgressEvent,
338+
GetProgressEvents extends ProgressEvent = ProgressEvent
339+
> extends Startable, EventEmitter<Libp2pEvents<
340+
Services,
341+
FindPeerProgressEvents,
342+
GetClosestPeersProgressEvents,
343+
ProvideProgressEvents,
344+
FindProvidersProgressEvents,
345+
PutProgressEvents,
346+
GetProgressEvents
347+
>> {
307348
/**
308349
* The PeerId is a unique identifier for a node on the network.
309350
*
@@ -354,7 +395,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ev
354395
* }
355396
* ```
356397
*/
357-
peerRouting: PeerRouting
398+
peerRouting: PeerRouting<FindPeerProgressEvents, GetClosestPeersProgressEvents>
358399

359400
/**
360401
* The content routing subsystem allows the user to find providers for content,
@@ -370,7 +411,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ev
370411
* }
371412
* ```
372413
*/
373-
contentRouting: ContentRouting
414+
contentRouting: ContentRouting<ProvideProgressEvents, FindProvidersProgressEvents, GetProgressEvents, PutProgressEvents>
374415

375416
/**
376417
* The keychain contains the keys used by the current node, and can create new
@@ -597,7 +638,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ev
597638
/**
598639
* A set of user defined services
599640
*/
600-
services: T
641+
services: Services
601642
}
602643

603644
/**

packages/interface/src/peer-routing/index.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AbortOptions } from '../index.js'
22
import type { PeerId } from '../peer-id/index.js'
33
import type { PeerInfo } from '../peer-info/index.js'
4+
import type { ProgressEvent, ProgressOptions } from 'progress-events'
45

56
/**
67
* Any object that implements this Symbol as a property should return a
@@ -23,7 +24,10 @@ import type { PeerInfo } from '../peer-info/index.js'
2324
*/
2425
export const peerRouting = Symbol.for('@libp2p/peer-routing')
2526

26-
export interface PeerRouting {
27+
export interface PeerRouting<
28+
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
29+
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent
30+
> {
2731
/**
2832
* Searches the network for peer info corresponding to the passed peer id.
2933
*
@@ -34,7 +38,7 @@ export interface PeerRouting {
3438
* const peer = await peerRouting.findPeer(peerId, options)
3539
* ```
3640
*/
37-
findPeer: (peerId: PeerId, options?: AbortOptions) => Promise<PeerInfo>
41+
findPeer: (peerId: PeerId, options?: AbortOptions & ProgressOptions<FindPeerProgressEvents>) => Promise<PeerInfo>
3842

3943
/**
4044
* Search the network for peers that are closer to the passed key. Peer
@@ -49,5 +53,5 @@ export interface PeerRouting {
4953
* }
5054
* ```
5155
*/
52-
getClosestPeers: (key: Uint8Array, options?: AbortOptions) => AsyncIterable<PeerInfo>
56+
getClosestPeers: (key: Uint8Array, options?: AbortOptions & ProgressOptions<GetClosestPeersProgressEvents>) => AsyncIterable<PeerInfo>
5357
}

packages/kad-dht/src/dual-kad-dht.ts

+50-11
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,73 @@ import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
44
import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery'
55
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
66
import { logger } from '@libp2p/logger'
7-
import drain from 'it-drain'
87
import merge from 'it-merge'
98
import isPrivate from 'private-ip'
9+
import { CustomProgressEvent } from 'progress-events'
1010
import { DefaultKadDHT } from './kad-dht.js'
1111
import { queryErrorEvent } from './query/events.js'
1212
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
1313
import type { PeerId } from '@libp2p/interface/peer-id'
1414
import type { PeerInfo } from '@libp2p/interface/peer-info'
1515
import type { Multiaddr } from '@multiformats/multiaddr'
1616
import type { CID } from 'multiformats/cid'
17+
import type { ProgressEvent, ProgressOptions } from 'progress-events'
18+
19+
export type ProvideProgressEvents =
20+
ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent>
21+
22+
export type FindProvidersProgressEvents =
23+
ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent>
24+
25+
export type PutProgressEvents =
26+
ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent>
27+
28+
export type GetProgressEvents =
29+
ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent>
1730

1831
const log = logger('libp2p:kad-dht')
1932

2033
/**
2134
* Wrapper class to convert events into returned values
2235
*/
23-
class DHTContentRouting implements ContentRouting {
36+
class DHTContentRouting implements ContentRouting<
37+
ProvideProgressEvents,
38+
FindProvidersProgressEvents,
39+
PutProgressEvents,
40+
GetProgressEvents
41+
> {
2442
private readonly dht: KadDHT
2543

2644
constructor (dht: KadDHT) {
2745
this.dht = dht
2846
}
2947

30-
async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
31-
await drain(this.dht.provide(cid, options))
48+
async provide (cid: CID, options: QueryOptions & ProgressOptions<ProvideProgressEvents> = {}): Promise<void> {
49+
for await (const event of this.dht.provide(cid, options)) {
50+
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event))
51+
}
3252
}
3353

34-
async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
54+
async * findProviders (cid: CID, options: QueryOptions & ProgressOptions<FindProvidersProgressEvents> = {}): AsyncGenerator<PeerInfo, void, undefined> {
3555
for await (const event of this.dht.findProviders(cid, options)) {
56+
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event))
57+
3658
if (event.name === 'PROVIDER') {
3759
yield * event.providers
3860
}
3961
}
4062
}
4163

42-
async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
43-
await drain(this.dht.put(key, value, options))
64+
async put (key: Uint8Array, value: Uint8Array, options: QueryOptions & ProgressOptions<PutProgressEvents> = {}): Promise<void> {
65+
for await (const event of this.dht.put(key, value, options)) {
66+
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event))
67+
}
4468
}
4569

46-
async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
70+
async get (key: Uint8Array, options: QueryOptions & ProgressOptions<GetProgressEvents> = {}): Promise<Uint8Array> {
4771
for await (const event of this.dht.get(key, options)) {
72+
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event))
73+
4874
if (event.name === 'VALUE') {
4975
return event.value
5076
}
@@ -54,18 +80,29 @@ class DHTContentRouting implements ContentRouting {
5480
}
5581
}
5682

83+
export type FindPeerProgressEvents =
84+
ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent>
85+
86+
export type GetClosestPeersProgressEvents =
87+
ProgressEvent<'libp2p:peer-routing:get-closest-peers:dht:event', QueryEvent>
88+
5789
/**
5890
* Wrapper class to convert events into returned values
5991
*/
60-
class DHTPeerRouting implements PeerRouting {
92+
class DHTPeerRouting implements PeerRouting<
93+
FindPeerProgressEvents,
94+
GetClosestPeersProgressEvents
95+
> {
6196
private readonly dht: KadDHT
6297

6398
constructor (dht: KadDHT) {
6499
this.dht = dht
65100
}
66101

67-
async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
102+
async findPeer (peerId: PeerId, options: QueryOptions & ProgressOptions<FindPeerProgressEvents> = {}): Promise<PeerInfo> {
68103
for await (const event of this.dht.findPeer(peerId, options)) {
104+
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event))
105+
69106
if (event.name === 'FINAL_PEER') {
70107
return event.peer
71108
}
@@ -74,8 +111,10 @@ class DHTPeerRouting implements PeerRouting {
74111
throw new CodeError('Not found', 'ERR_NOT_FOUND')
75112
}
76113

77-
async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
114+
async * getClosestPeers (key: Uint8Array, options: QueryOptions & ProgressOptions<GetClosestPeersProgressEvents> = {}): AsyncIterable<PeerInfo> {
78115
for await (const event of this.dht.getClosestPeers(key, options)) {
116+
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peers:dht:event', event))
117+
79118
if (event.name === 'FINAL_PEER') {
80119
yield event.peer
81120
}

packages/libp2p/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
"p-queue": "^7.3.4",
159159
"p-retry": "^5.0.0",
160160
"private-ip": "^3.0.0",
161+
"progress-events": "^1.0.0",
161162
"protons-runtime": "^5.0.0",
162163
"rate-limiter-flexible": "^2.3.11",
163164
"uint8arraylist": "^2.4.3",

0 commit comments

Comments
 (0)