Skip to content

Commit 38ce246

Browse files
feat: improve provider fetching
Integrates libp2p/rust-libp2p#2712 to fetch providers more quickly. For widely available providers this reduces the fetch time considerably.
1 parent 652b7ff commit 38ce246

File tree

4 files changed

+134
-39
lines changed

4 files changed

+134
-39
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ members = [
1818

1919
[patch.crates-io]
2020
# TODO: switch to crates.io once 0.45 is released
21-
libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
21+
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" }
22+
# libp2p = { path = "../rust-libp2p" }

iroh-p2p/src/behaviour.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,18 @@ impl NodeBehaviour {
9393
.into();
9494

9595
let kad = if config.kademlia {
96-
let local_peer_id = local_key.public().to_peer_id();
96+
let pub_key = local_key.public();
97+
9798
// TODO: persist to store
98-
let store = MemoryStore::new(local_peer_id.to_owned());
99+
let store = MemoryStore::new(pub_key.to_peer_id());
100+
99101
// TODO: make user configurable
100102
let mut kad_config = KademliaConfig::default();
101103
kad_config.set_parallelism(16usize.try_into().unwrap());
102104
// TODO: potentially lower (this is per query)
103105
kad_config.set_query_timeout(Duration::from_secs(5));
104106

105-
let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config);
107+
let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config);
106108
for multiaddr in &config.bootstrap_peers {
107109
// TODO: move parsing into config
108110
let mut addr = multiaddr.to_owned();
@@ -113,9 +115,12 @@ impl NodeBehaviour {
113115
warn!("Could not parse bootstrap addr {}", multiaddr);
114116
}
115117
}
118+
119+
// Trigger initial bootstrap
116120
if let Err(e) = kademlia.bootstrap() {
117121
warn!("Kademlia bootstrap failed: {}", e);
118122
}
123+
119124
Some(kademlia)
120125
} else {
121126
None
@@ -163,4 +168,12 @@ impl NodeBehaviour {
163168
kad.add_address(peer, addr);
164169
}
165170
}
171+
172+
pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) {
173+
if let Some(kad) = self.kad.as_mut() {
174+
if let Some(mut query) = kad.query_mut(id) {
175+
query.finish();
176+
}
177+
}
178+
}
166179
}

iroh-p2p/src/rpc.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use futures::channel::oneshot;
99
use libp2p::kad::record::Key;
1010
use libp2p::Multiaddr;
1111
use libp2p::PeerId;
12+
use tokio::sync::mpsc;
1213
use tonic::{transport::Server as TonicServer, Request, Response, Status};
1314
use tracing::trace;
1415

@@ -84,22 +85,26 @@ impl p2p_server::P2p for P2p {
8485
iroh_metrics::req::set_trace_ctx(&request);
8586
let req = request.into_inner();
8687
trace!("received ProviderRequest: {:?}", req.key);
87-
let (s, r) = oneshot::channel();
88+
let (s, mut r) = mpsc::channel(1024);
8889
let msg = RpcMessage::ProviderRequest {
8990
key: req.key.into(),
9091
response_channel: s,
9192
};
93+
9294
self.sender
9395
.send(msg)
9496
.await
9597
.map_err(|_| Status::internal("receiver dropped"))?;
9698

97-
let providers = r
98-
.await
99-
.map_err(|_| Status::internal("sender dropped"))?
100-
.map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?;
99+
// TODO: streaming response
100+
let mut providers = Vec::new();
101+
while let Some(provider) = r.recv().await {
102+
match provider {
103+
Ok(provider) => providers.push(provider.to_bytes()),
104+
Err(e) => return Err(Status::internal(e)),
105+
}
106+
}
101107

102-
let providers = providers.into_iter().map(|p| p.to_bytes()).collect();
103108
Ok(Response::new(Providers { providers }))
104109
}
105110

@@ -219,7 +224,7 @@ pub enum RpcMessage {
219224
ProviderRequest {
220225
// TODO: potentially change this to Cid, as that is the only key we use for providers
221226
key: Key,
222-
response_channel: oneshot::Sender<Result<HashSet<PeerId>, String>>,
227+
response_channel: mpsc::Sender<Result<PeerId, String>>,
223228
},
224229
NetListeningAddrs(oneshot::Sender<(PeerId, Vec<Multiaddr>)>),
225230
NetPeers(oneshot::Sender<HashMap<PeerId, Vec<Multiaddr>>>),

iroh-p2p/src/service.rs

Lines changed: 104 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::collections::{HashMap, HashSet};
1+
use std::collections::HashMap;
22
use std::num::NonZeroU8;
33
use std::time::Duration;
44

55
use ahash::AHashMap;
66
use anyhow::{anyhow, Context, Result};
77
use async_channel::{bounded as channel, Receiver};
88
use cid::Cid;
9-
use futures::channel::oneshot::{self, Sender as OneShotSender};
9+
use futures::channel::oneshot::Sender as OneShotSender;
1010
use futures_util::stream::StreamExt;
1111
use iroh_rpc_client::Client as RpcClient;
1212
use libp2p::core::muxing::StreamMuxerBox;
@@ -16,7 +16,8 @@ pub use libp2p::gossipsub::{IdentTopic, Topic};
1616
use libp2p::identify::{IdentifyEvent, IdentifyInfo};
1717
use libp2p::identity::Keypair;
1818
use libp2p::kad::{
19-
self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryResult,
19+
self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent,
20+
QueryProgress, QueryResult,
2021
};
2122
use libp2p::metrics::{Metrics, Recorder};
2223
use libp2p::multiaddr::Protocol;
@@ -28,7 +29,7 @@ use libp2p::swarm::{
2829
use libp2p::yamux::WindowUpdateMode;
2930
use libp2p::{core, mplex, noise, yamux, PeerId, Swarm, Transport};
3031
use prometheus_client::registry::Registry;
31-
use tokio::{select, time};
32+
use tokio::{select, sync::mpsc, time};
3233
use tracing::{debug, info, trace, warn};
3334

3435
use iroh_bitswap::{
@@ -62,14 +63,16 @@ pub struct Libp2pService {
6263
}
6364

6465
enum QueryChannel {
65-
GetProviders(Vec<oneshot::Sender<Result<HashSet<PeerId>, String>>>),
66+
GetProviders(Vec<mpsc::Sender<Result<PeerId, String>>>),
6667
}
6768

6869
#[derive(Debug, Hash, PartialEq, Eq)]
6970
enum QueryKey {
7071
ProviderKey(Key),
7172
}
7273

74+
const PROVIDER_LIMIT: usize = 20;
75+
7376
impl Libp2pService {
7477
pub async fn new(
7578
config: Libp2pConfig,
@@ -86,7 +89,7 @@ impl Libp2pService {
8689
.with_max_pending_outgoing(Some(30)) // TODO: configurable
8790
.with_max_established_incoming(Some(config.target_peer_count))
8891
.with_max_established_outgoing(Some(config.target_peer_count))
89-
.with_max_established_per_peer(Some(5)); // TODO: configurable
92+
.with_max_established_per_peer(Some(60)); // TODO: configurable
9093

9194
let node = NodeBehaviour::new(&net_keypair, &config, registry).await?;
9295
let mut swarm = SwarmBuilder::new(transport, node, peer_id)
@@ -221,42 +224,56 @@ impl Libp2pService {
221224
Event::Kademlia(e) => {
222225
self.metrics.record(&e);
223226
if let KademliaEvent::OutboundQueryCompleted { result, .. } = e {
224-
debug!("kad: {:?}", result);
227+
debug!("kad completed: {:?}", result);
225228
match result {
226-
QueryResult::GetProviders(Ok(GetProvidersOk {
227-
providers, key, ..
228-
})) => {
229-
if let Some(QueryChannel::GetProviders(chans)) =
230-
self.kad_queries.remove(&QueryKey::ProviderKey(key.clone()))
231-
{
232-
for chan in chans.into_iter() {
233-
debug!("Sending providers for {:?}", key);
234-
chan.send(Ok(providers.clone())).ok();
235-
}
236-
} else {
237-
debug!("No listeners");
238-
}
229+
QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => {
230+
let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key));
239231
}
232+
240233
QueryResult::GetProviders(Err(err)) => {
241-
let (key, providers) = match err {
242-
GetProvidersError::Timeout { key, providers, .. } => {
243-
(key, providers)
244-
}
234+
let key = match err {
235+
GetProvidersError::Timeout { key, .. } => key,
245236
};
246237
debug!("GetProviders timeout {:?}", key);
247238
if let Some(QueryChannel::GetProviders(chans)) =
248-
self.kad_queries.remove(&QueryKey::ProviderKey(key.clone()))
239+
self.kad_queries.remove(&QueryKey::ProviderKey(key))
249240
{
250241
for chan in chans.into_iter() {
251-
debug!("Sending providers for {:?}", key);
252-
chan.send(Ok(providers.clone())).ok();
242+
chan.send(Err("Timeout".into())).await.ok();
253243
}
254244
}
255245
}
256246
other => {
257247
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
258248
}
259249
}
250+
} else if let KademliaEvent::OutboundQueryProgressed {
251+
id, result, count, ..
252+
} = e
253+
{
254+
debug!("kad progressed: {:?}", result);
255+
match result {
256+
QueryProgress::GetProviders(GetProvidersProgress {
257+
key, provider, ..
258+
}) => {
259+
if count >= PROVIDER_LIMIT {
260+
debug!("finish provider query {}/{}", count, PROVIDER_LIMIT);
261+
// Finish query if we have enough providers.
262+
self.swarm.behaviour_mut().finish_query(&id);
263+
}
264+
265+
if let Some(QueryChannel::GetProviders(chans)) = self
266+
.kad_queries
267+
.get_mut(&QueryKey::ProviderKey(key.clone()))
268+
{
269+
for chan in chans.iter_mut() {
270+
chan.send(Ok(provider)).await.ok();
271+
}
272+
} else {
273+
debug!("No listeners");
274+
}
275+
}
276+
}
260277
}
261278
}
262279
Event::Identify(e) => {
@@ -342,7 +359,10 @@ impl Libp2pService {
342359
);
343360
}
344361
} else {
345-
response_channel.send(Ok(Default::default())).ok();
362+
response_channel
363+
.send(Err("kademlia is not available".into()))
364+
.await
365+
.ok();
346366
}
347367
}
348368
RpcMessage::NetListeningAddrs(response_channel) => {
@@ -434,3 +454,59 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo
434454
.timeout(Duration::from_secs(20)) // TODO: configurable
435455
.boxed()
436456
}
457+
458+
#[cfg(test)]
459+
mod tests {
460+
use crate::metrics;
461+
462+
use super::*;
463+
use anyhow::Result;
464+
use libp2p::identity::ed25519;
465+
466+
#[tokio::test]
467+
async fn test_fetch_providers() -> Result<()> {
468+
let mut prom_registry = Registry::default();
469+
let libp2p_metrics = Metrics::new(&mut prom_registry);
470+
let net_keypair = {
471+
let gen_keypair = ed25519::Keypair::generate();
472+
Keypair::Ed25519(gen_keypair)
473+
};
474+
475+
let mut network_config = Libp2pConfig::default();
476+
network_config.metrics.debug = true;
477+
let metrics_config = network_config.metrics.clone();
478+
479+
let mut p2p_service = Libp2pService::new(
480+
network_config,
481+
net_keypair,
482+
&mut prom_registry,
483+
libp2p_metrics,
484+
)
485+
.await?;
486+
487+
let metrics_handle = iroh_metrics::init_with_registry(
488+
metrics::metrics_config_with_compile_time_info(metrics_config),
489+
prom_registry,
490+
)
491+
.await
492+
.expect("failed to initialize metrics");
493+
494+
let cfg = iroh_rpc_client::Config::default();
495+
let p2p_task = tokio::task::spawn(async move {
496+
p2p_service.run().await.unwrap();
497+
});
498+
499+
{
500+
let client = RpcClient::new(&cfg).await?;
501+
let c = "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
502+
.parse()
503+
.unwrap();
504+
let providers = client.p2p.fetch_providers(&c).await?;
505+
assert!(providers.len() >= PROVIDER_LIMIT);
506+
}
507+
508+
p2p_task.abort();
509+
metrics_handle.shutdown();
510+
Ok(())
511+
}
512+
}

0 commit comments

Comments
 (0)