Skip to content

Commit 3577af8

Browse files
authored
fix: ensure greater spec compliance (#3105)
- Increase alpha to 10 - Split starting peers into multiple disjoint buckets - Report disjoint path info on events - Abort slow subqueries at the network level instead of query and network - Use a filter to separate disjoint paths - Only add closer peers to our list of closer peers if they are actually closer
1 parent 32627c8 commit 3577af8

26 files changed

+741
-330
lines changed

packages/kad-dht/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"it-parallel": "^3.0.8",
7878
"it-pipe": "^3.0.1",
7979
"it-protobuf-stream": "^2.0.1",
80+
"it-pushable": "^3.2.3",
8081
"it-take": "^3.0.6",
8182
"mortice": "^3.0.6",
8283
"multiformats": "^13.3.1",

packages/kad-dht/src/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export const GET_MANY_RECORD_COUNT = 16
3636
export const K = 20
3737

3838
// Alpha is the concurrency for asynchronous requests
39-
export const ALPHA = 3
39+
export const ALPHA = 10
4040

4141
// How often we look for our closest DHT neighbors
4242
export const QUERY_SELF_INTERVAL = 5 * minute

packages/kad-dht/src/content-fetching/index.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export class ContentFetching {
129129
}
130130

131131
if (!sentCorrection) {
132-
yield queryErrorEvent({ from, error: new QueryError('Value not put correctly') }, options)
132+
throw new QueryError('Could not send correction')
133133
}
134134

135135
this.log.error('Failed error correcting entry')
@@ -182,7 +182,11 @@ export class ContentFetching {
182182
}
183183

184184
if (!(putEvent.record != null && uint8ArrayEquals(putEvent.record.value, Libp2pRecord.deserialize(record).value))) {
185-
events.push(queryErrorEvent({ from: event.peer.id, error: new QueryError('Value not put correctly') }, options))
185+
events.push(queryErrorEvent({
186+
from: event.peer.id,
187+
error: new QueryError('Value not put correctly'),
188+
path: putEvent.path
189+
}, options))
186190
}
187191
}
188192

@@ -212,6 +216,7 @@ export class ContentFetching {
212216
for await (const event of this.getMany(key, options)) {
213217
if (event.name === 'VALUE') {
214218
vals.push(event)
219+
continue
215220
}
216221

217222
yield event
@@ -242,7 +247,12 @@ export class ContentFetching {
242247

243248
yield * this.sendCorrectionRecord(key, vals, best, {
244249
...options,
245-
path: -1
250+
path: {
251+
index: -1,
252+
queued: 0,
253+
running: 0,
254+
total: 0
255+
}
246256
})
247257

248258
yield vals[i]
@@ -259,7 +269,13 @@ export class ContentFetching {
259269

260270
yield valueEvent({
261271
value: localRec.value,
262-
from: this.components.peerId
272+
from: this.components.peerId,
273+
path: {
274+
index: -1,
275+
running: 0,
276+
queued: 0,
277+
total: 0
278+
}
263279
}, options)
264280
} catch (err: any) {
265281
this.log('error getting local value for %b', key, err)
@@ -268,15 +284,19 @@ export class ContentFetching {
268284
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
269285

270286
const getValueQuery: QueryFunc = async function * ({ peer, signal, path }) {
271-
for await (const event of self.peerRouting.getValueOrPeers(peer, key, {
287+
for await (const event of self.peerRouting.getValueOrPeers(peer.id, key, {
272288
...options,
273289
signal,
274290
path
275291
})) {
276292
yield event
277293

278294
if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
279-
yield valueEvent({ from: peer, value: event.record.value }, options)
295+
yield valueEvent({
296+
from: peer.id,
297+
value: event.record.value,
298+
path
299+
}, options)
280300
}
281301
}
282302
}

packages/kad-dht/src/content-routing/index.ts

Lines changed: 91 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { PeerSet } from '@libp2p/peer-collections'
2-
import map from 'it-map'
3-
import parallel from 'it-parallel'
4-
import { pipe } from 'it-pipe'
2+
import { Queue } from '@libp2p/utils/queue'
3+
import { pushable } from 'it-pushable'
54
import { ALPHA } from '../constants.js'
65
import { MessageType } from '../message/dht.js'
76
import { toPbPeerInfo } from '../message/utils.js'
@@ -10,7 +9,7 @@ import {
109
peerResponseEvent,
1110
providerEvent
1211
} from '../query/events.js'
13-
import type { KadDHTComponents, PeerResponseEvent, ProviderEvent, QueryEvent } from '../index.js'
12+
import type { FinalPeerEvent, KadDHTComponents, PeerResponseEvent, ProviderEvent, QueryEvent } from '../index.js'
1413
import type { Message } from '../message/dht.js'
1514
import type { Network } from '../network.js'
1615
import type { PeerRouting } from '../peer-routing/index.js'
@@ -98,54 +97,76 @@ export class ContentRouting {
9897
}
9998

10099
let sent = 0
100+
const self = this
101+
102+
async function * publishProviderRecord (event: FinalPeerEvent): AsyncGenerator<QueryEvent, void, undefined> {
103+
try {
104+
self.log('sending provider record for %s to %p', key, event.peer.id)
105+
106+
for await (const addProviderEvent of self.network.sendMessage(event.peer.id, msg, {
107+
...options,
108+
path: event.path
109+
})) {
110+
if (addProviderEvent.name === 'PEER_RESPONSE') {
111+
self.log('sent provider record for %s to %p', key, event.peer.id)
112+
sent++
113+
}
101114

102-
const maybeNotifyPeer = (event: QueryEvent) => {
103-
return async () => {
104-
if (event.name !== 'FINAL_PEER') {
105-
return [event]
115+
yield addProviderEvent
106116
}
117+
} catch (err: any) {
118+
self.log.error('error sending provide record to peer %p', event.peer.id, err)
119+
yield queryErrorEvent({
120+
from: event.peer.id,
121+
error: err,
122+
path: event.path
123+
}, options)
124+
}
125+
}
107126

108-
const events = []
127+
const events = pushable<QueryEvent>({
128+
objectMode: true
129+
})
109130

110-
this.log('putProvider %s to %p', key, event.peer.id)
131+
const queue = new Queue({
132+
concurrency: ALPHA
133+
})
134+
queue.addEventListener('idle', () => {
135+
events.end()
136+
})
137+
queue.addEventListener('error', (err) => {
138+
this.log.error('error publishing provider record to peer - %e', err)
139+
})
111140

112-
try {
113-
this.log('sending provider record for %s to %p', key, event.peer.id)
114-
115-
for await (const sendEvent of this.network.sendMessage(event.peer.id, msg, {
116-
...options,
117-
path: event.path ?? -1
118-
})) {
119-
if (sendEvent.name === 'PEER_RESPONSE') {
120-
this.log('sent provider record for %s to %p', key, event.peer.id)
121-
sent++
122-
}
123-
124-
events.push(sendEvent)
125-
}
126-
} catch (err: any) {
127-
this.log.error('error sending provide record to peer %p', event.peer.id, err)
128-
events.push(queryErrorEvent({ from: event.peer.id, error: err }, options))
129-
}
141+
queue.add(async () => {
142+
const finalPeerEvents: FinalPeerEvent[] = []
130143

131-
return events
132-
}
133-
}
144+
for await (const event of this.peerRouting.getClosestPeers(target, options)) {
145+
events.push(event)
134146

135-
// Notify closest peers
136-
yield * pipe(
137-
this.peerRouting.getClosestPeers(target, options),
138-
(source) => map(source, (event) => maybeNotifyPeer(event)),
139-
(source) => parallel(source, {
140-
ordered: false,
141-
concurrency: ALPHA
142-
}),
143-
async function * (source) {
144-
for await (const events of source) {
145-
yield * events
147+
if (event.name !== 'FINAL_PEER') {
148+
continue
146149
}
150+
151+
finalPeerEvents.push(event)
147152
}
148-
)
153+
154+
finalPeerEvents.forEach(event => {
155+
queue.add(async () => {
156+
for await (const notifyEvent of publishProviderRecord(event)) {
157+
events.push(notifyEvent)
158+
}
159+
})
160+
.catch(err => {
161+
this.log.error('error publishing provider record to peer - %e', err)
162+
})
163+
})
164+
})
165+
.catch(err => {
166+
events.end(err)
167+
})
168+
169+
yield * events
149170

150171
this.log('sent provider records to %d peers', sent)
151172
}
@@ -184,8 +205,27 @@ export class ContentRouting {
184205
}
185206
}
186207

187-
yield peerResponseEvent({ from: this.components.peerId, messageType: MessageType.GET_PROVIDERS, providers, path: -1 }, options)
188-
yield providerEvent({ from: this.components.peerId, providers }, options)
208+
yield peerResponseEvent({
209+
from: this.components.peerId,
210+
messageType: MessageType.GET_PROVIDERS,
211+
providers,
212+
path: {
213+
index: -1,
214+
queued: 0,
215+
running: 0,
216+
total: 0
217+
}
218+
}, options)
219+
yield providerEvent({
220+
from: this.components.peerId,
221+
providers,
222+
path: {
223+
index: -1,
224+
queued: 0,
225+
running: 0,
226+
total: 0
227+
}
228+
}, options)
189229

190230
found += providers.length
191231

@@ -203,7 +243,7 @@ export class ContentRouting {
203243
key: target
204244
}
205245

206-
yield * self.network.sendRequest(peer, request, {
246+
yield * self.network.sendRequest(peer.id, request, {
207247
...options,
208248
signal,
209249
path
@@ -230,7 +270,11 @@ export class ContentRouting {
230270
}
231271

232272
if (newProviders.length > 0) {
233-
yield providerEvent({ from: event.from, providers: newProviders }, options)
273+
yield providerEvent({
274+
from: event.from,
275+
providers: newProviders,
276+
path: event.path
277+
}, options)
234278

235279
found += newProviders.length
236280

0 commit comments

Comments
 (0)