Skip to content

Commit 185b23e

Browse files
authored
feat: add path index to events (#3102)
KAD-DHT queries operate on disjoint paths which can make following query events hard. Add a `.path` property to events with the numeric index of the disjoint path the query is following, this makes reconciling events in progress handlers much easier.
1 parent 4b8c0a6 commit 185b23e

35 files changed

+485
-541
lines changed

packages/kad-dht/src/content-fetching/index.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { verifyRecord } from '../record/validators.js'
1818
import { createPutRecord, bufferToRecordKey } from '../utils.js'
1919
import type { KadDHTComponents, Validators, Selectors, ValueEvent, QueryEvent } from '../index.js'
2020
import type { Message } from '../message/dht.js'
21-
import type { Network } from '../network.js'
21+
import type { Network, SendMessageOptions } from '../network.js'
2222
import type { PeerRouting } from '../peer-routing/index.js'
2323
import type { QueryManager } from '../query/manager.js'
2424
import type { QueryFunc } from '../query/types.js'
@@ -88,7 +88,7 @@ export class ContentFetching {
8888
/**
8989
* Send the best record found to any peers that have an out of date record
9090
*/
91-
async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: RoutingOptions = {}): AsyncGenerator<QueryEvent> {
91+
async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: SendMessageOptions): AsyncGenerator<QueryEvent> {
9292
this.log('sendCorrection for %b', key)
9393
const fixupRec = createPutRecord(key, best)
9494

@@ -139,7 +139,7 @@ export class ContentFetching {
139139
/**
140140
* Store the given key/value pair in the DHT
141141
*/
142-
async * put (key: Uint8Array, value: Uint8Array, options: RoutingOptions = {}): AsyncGenerator<unknown, void, undefined> {
142+
async * put (key: Uint8Array, value: Uint8Array, options: RoutingOptions): AsyncGenerator<unknown, void, undefined> {
143143
this.log('put key %b value %b', key, value)
144144

145145
// create record in the dht format
@@ -171,7 +171,10 @@ export class ContentFetching {
171171
}
172172

173173
this.log('send put to %p', event.peer.id)
174-
for await (const putEvent of this.network.sendRequest(event.peer.id, msg, options)) {
174+
for await (const putEvent of this.network.sendRequest(event.peer.id, msg, {
175+
...options,
176+
path: event.path
177+
})) {
175178
events.push(putEvent)
176179

177180
if (putEvent.name !== 'PEER_RESPONSE') {
@@ -201,7 +204,7 @@ export class ContentFetching {
201204
/**
202205
* Get the value to the given key
203206
*/
204-
async * get (key: Uint8Array, options: RoutingOptions = {}): AsyncGenerator<QueryEvent | ValueEvent> {
207+
async * get (key: Uint8Array, options: RoutingOptions): AsyncGenerator<QueryEvent | ValueEvent> {
205208
this.log('get %b', key)
206209

207210
const vals: ValueEvent[] = []
@@ -237,7 +240,10 @@ export class ContentFetching {
237240
throw new NotFoundError('Best value was not found')
238241
}
239242

240-
yield * this.sendCorrectionRecord(key, vals, best, options)
243+
yield * this.sendCorrectionRecord(key, vals, best, {
244+
...options,
245+
path: -1
246+
})
241247

242248
yield vals[i]
243249
}
@@ -261,10 +267,11 @@ export class ContentFetching {
261267

262268
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
263269

264-
const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
270+
const getValueQuery: QueryFunc = async function * ({ peer, signal, path }) {
265271
for await (const event of self.peerRouting.getValueOrPeers(peer, key, {
266272
...options,
267-
signal
273+
signal,
274+
path
268275
})) {
269276
yield event
270277

packages/kad-dht/src/content-routing/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ export class ContentRouting {
112112
try {
113113
this.log('sending provider record for %s to %p', key, event.peer.id)
114114

115-
for await (const sendEvent of this.network.sendMessage(event.peer.id, msg, options)) {
115+
for await (const sendEvent of this.network.sendMessage(event.peer.id, msg, {
116+
...options,
117+
path: event.path ?? -1
118+
})) {
116119
if (sendEvent.name === 'PEER_RESPONSE') {
117120
this.log('sent provider record for %s to %p', key, event.peer.id)
118121
sent++
@@ -181,7 +184,7 @@ export class ContentRouting {
181184
}
182185
}
183186

184-
yield peerResponseEvent({ from: this.components.peerId, messageType: MessageType.GET_PROVIDERS, providers }, options)
187+
yield peerResponseEvent({ from: this.components.peerId, messageType: MessageType.GET_PROVIDERS, providers, path: -1 }, options)
185188
yield providerEvent({ from: this.components.peerId, providers }, options)
186189

187190
found += providers.length
@@ -194,15 +197,16 @@ export class ContentRouting {
194197
/**
195198
* The query function to use on this particular disjoint path
196199
*/
197-
const findProvidersQuery: QueryFunc = async function * ({ peer, signal }) {
200+
const findProvidersQuery: QueryFunc = async function * ({ peer, signal, path }) {
198201
const request = {
199202
type: MessageType.GET_PROVIDERS,
200203
key: target
201204
}
202205

203206
yield * self.network.sendRequest(peer, request, {
204207
...options,
205-
signal
208+
signal,
209+
path
206210
})
207211
}
208212

packages/kad-dht/src/index.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ export interface SendQueryEvent {
192192
name: 'SEND_QUERY'
193193
messageName: keyof typeof MessageType
194194
messageType: MessageType
195+
path: number
195196
}
196197

197198
/**
@@ -207,6 +208,7 @@ export interface PeerResponseEvent {
207208
closer: PeerInfo[]
208209
providers: PeerInfo[]
209210
record?: DHTRecord
211+
path: number
210212
}
211213

212214
/**
@@ -217,6 +219,7 @@ export interface FinalPeerEvent {
217219
peer: PeerInfo
218220
type: EventTypes.FINAL_PEER
219221
name: 'FINAL_PEER'
222+
path: number
220223
}
221224

222225
/**
@@ -227,6 +230,7 @@ export interface QueryErrorEvent {
227230
type: EventTypes.QUERY_ERROR
228231
name: 'QUERY_ERROR'
229232
error: Error
233+
path?: number
230234
}
231235

232236
/**
@@ -256,10 +260,13 @@ export interface AddPeerEvent {
256260
type: EventTypes.ADD_PEER
257261
name: 'ADD_PEER'
258262
peer: PeerId
263+
path: number
259264
}
260265

261266
/**
262267
* Emitted when peers are dialled as part of a query
268+
*
269+
* @deprecated No longer emitted as sometimes connections are reused so it's not possible to say with certainty that a peer has been dialled
263270
*/
264271
export interface DialPeerEvent {
265272
peer: PeerId
@@ -506,8 +513,8 @@ export interface KadDHTInit {
506513
initialQuerySelfInterval?: number
507514

508515
/**
509-
* After startup by default all queries will be paused until the initial
510-
* self-query has run and there are some peers in the routing table.
516+
* After startup by default all queries will be paused until there is at least
517+
* one peer in the routing table.
511518
*
512519
* Pass true here to disable this behavior.
513520
*

packages/kad-dht/src/kad-dht.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
223223
logPrefix,
224224
metricsPrefix,
225225
initialQuerySelfHasRun,
226-
routingTable: this.routingTable
226+
routingTable: this.routingTable,
227+
allowQueryWithZeroPeers: init.allowQueryWithZeroPeers
227228
})
228229

229230
// DHT components
@@ -276,7 +277,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
276277
initialInterval: init.initialQuerySelfInterval,
277278
logPrefix,
278279
initialQuerySelfHasRun,
279-
routingTable: this.routingTable,
280280
operationMetrics
281281
})
282282
this.reprovider = new Reprovider(components, {

packages/kad-dht/src/network.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { pbStream } from 'it-protobuf-stream'
55
import { Message } from './message/dht.js'
66
import { fromPbPeerInfo } from './message/utils.js'
77
import {
8-
dialPeerEvent,
98
sendQueryEvent,
109
peerResponseEvent,
1110
queryErrorEvent
@@ -24,6 +23,15 @@ interface NetworkEvents {
2423
'peer': CustomEvent<PeerInfo>
2524
}
2625

26+
export interface SendMessageOptions extends RoutingOptions {
27+
/**
28+
* Queries involve following up to `k` disjoint paths through the network -
29+
* this option is which index within `k` this message is for, and it
30+
* allows observers to collate events together on a per-path basis
31+
*/
32+
path: number
33+
}
34+
2735
/**
2836
* Handle network operations for the dht
2937
*/
@@ -142,7 +150,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
142150
/**
143151
* Send a request and read a response
144152
*/
145-
async * sendRequest (to: PeerId, msg: Partial<Message>, options: RoutingOptions = {}): AsyncGenerator<QueryEvent> {
153+
async * sendRequest (to: PeerId, msg: Partial<Message>, options: SendMessageOptions): AsyncGenerator<QueryEvent> {
146154
if (!this.running) {
147155
return
148156
}
@@ -154,8 +162,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
154162
}
155163

156164
this.log('sending %s to %p', msg.type, to)
157-
yield dialPeerEvent({ peer: to }, options)
158-
yield sendQueryEvent({ to, type }, options)
165+
yield sendQueryEvent({ to, type, path: options.path }, options)
159166

160167
let stream: Stream | undefined
161168
const signal = this.timeout.getTimeoutSignal(options)
@@ -183,7 +190,8 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
183190
messageType: response.type,
184191
closer: response.closer.map(fromPbPeerInfo),
185192
providers: response.providers.map(fromPbPeerInfo),
186-
record: response.record == null ? undefined : Libp2pRecord.deserialize(response.record)
193+
record: response.record == null ? undefined : Libp2pRecord.deserialize(response.record),
194+
path: options.path
187195
}, options)
188196
} catch (err: any) {
189197
this.metrics.errors?.increment({ [type]: true })
@@ -196,7 +204,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
196204
this.log.error('could not send %s to %p - %e', msg.type, to, err)
197205
}
198206

199-
yield queryErrorEvent({ from: to, error: err }, options)
207+
yield queryErrorEvent({ from: to, error: err, path: options.path }, options)
200208
} finally {
201209
this.timeout.cleanUp(signal)
202210
}
@@ -205,7 +213,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
205213
/**
206214
* Sends a message without expecting an answer
207215
*/
208-
async * sendMessage (to: PeerId, msg: Partial<Message>, options: RoutingOptions = {}): AsyncGenerator<QueryEvent> {
216+
async * sendMessage (to: PeerId, msg: Partial<Message>, options: SendMessageOptions): AsyncGenerator<QueryEvent> {
209217
if (!this.running) {
210218
return
211219
}
@@ -217,8 +225,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
217225
}
218226

219227
this.log('sending %s to %p', msg.type, to)
220-
yield dialPeerEvent({ peer: to }, options)
221-
yield sendQueryEvent({ to, type }, options)
228+
yield sendQueryEvent({ to, type, path: options.path }, options)
222229

223230
let stream: Stream | undefined
224231
const signal = this.timeout.getTimeoutSignal(options)
@@ -242,12 +249,12 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
242249
stream?.abort(err)
243250
})
244251

245-
yield peerResponseEvent({ from: to, messageType: type }, options)
252+
yield peerResponseEvent({ from: to, messageType: type, path: options.path }, options)
246253
} catch (err: any) {
247254
this.metrics.errors?.increment({ [type]: true })
248255

249256
stream?.abort(err)
250-
yield queryErrorEvent({ from: to, error: err }, options)
257+
yield queryErrorEvent({ from: to, error: err, path: options.path }, options)
251258
} finally {
252259
this.timeout.cleanUp(signal)
253260
}

packages/kad-dht/src/peer-distance-list.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { PeerId, PeerInfo } from '@libp2p/interface'
66
interface PeerDistance {
77
peer: PeerInfo
88
distance: Uint8Array
9+
path: number
910
}
1011

1112
/**
@@ -40,30 +41,31 @@ export class PeerDistanceList {
4041
/**
4142
* The peers in the list, in order of distance from the origin key
4243
*/
43-
get peers (): PeerInfo[] {
44-
return this.peerDistances.map(pd => pd.peer)
44+
get peers (): PeerDistance[] {
45+
return [...this.peerDistances]
4546
}
4647

4748
/**
4849
* Add a peerId to the list.
4950
*/
50-
async add (peer: PeerInfo): Promise<void> {
51+
async add (peer: PeerInfo, path: number = -1): Promise<void> {
5152
const dhtKey = await convertPeerId(peer.id)
5253

53-
this.addWithKadId(peer, dhtKey)
54+
this.addWithKadId(peer, dhtKey, path)
5455
}
5556

5657
/**
5758
* Add a peerId to the list.
5859
*/
59-
addWithKadId (peer: PeerInfo, kadId: Uint8Array): void {
60+
addWithKadId (peer: PeerInfo, kadId: Uint8Array, path: number = -1): void {
6061
if (this.peerDistances.find(pd => pd.peer.id.equals(peer.id)) != null) {
6162
return
6263
}
6364

64-
const el = {
65+
const el: PeerDistance = {
6566
peer,
66-
distance: uint8ArrayXor(this.originDhtKey, kadId)
67+
distance: uint8ArrayXor(this.originDhtKey, kadId),
68+
path
6769
}
6870

6971
let added = false

0 commit comments

Comments
 (0)