Skip to content

Commit dc5db66

Browse files
refactor to use Index structure and emit multiple providers at once
1 parent b224c59 commit dc5db66

File tree

7 files changed

+226
-173
lines changed

7 files changed

+226
-173
lines changed

examples/distributed-key-value-store.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use libp2p::{
5353
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
5454
NetworkBehaviour, PeerId, Swarm,
5555
};
56-
use libp2p_kad::{GetProvidersProgress, QueryProgress};
56+
use libp2p_kad::GetProvidersOk;
5757
use std::error::Error;
5858

5959
#[async_std::main]
@@ -91,15 +91,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
9191
fn inject_event(&mut self, message: KademliaEvent) {
9292
match message {
9393
KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
94-
QueryProgress::GetProviders(GetProvidersProgress { key, provider, .. }) => {
95-
println!(
96-
"Peer {:?} provides key {:?}",
97-
provider,
98-
std::str::from_utf8(key.as_ref()).unwrap()
99-
);
94+
QueryResult::GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
95+
for provider in &providers {
96+
println!(
97+
"Peer {:?} provides key {:?}",
98+
provider,
99+
std::str::from_utf8(key.as_ref()).unwrap()
100+
);
101+
}
100102
}
101-
},
102-
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
103103
QueryResult::GetProviders(Err(err)) => {
104104
eprintln!("Failed to get providers: {:?}", err);
105105
}

examples/file-sharing.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ mod network {
219219
};
220220
use libp2p::swarm::{ConnectionHandlerUpgrErr, SwarmBuilder, SwarmEvent};
221221
use libp2p::{NetworkBehaviour, Swarm};
222-
use libp2p_kad::{GetProvidersProgress, QueryProgress};
222+
use libp2p_kad::GetProvidersOk;
223223
use std::collections::{HashMap, HashSet};
224224
use std::iter;
225225

@@ -325,12 +325,16 @@ mod network {
325325

326326
/// Find the providers for the given file on the DHT.
327327
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
328-
let (sender, receiver) = mpsc::channel(0);
328+
let (sender, mut receiver) = mpsc::channel(0);
329329
self.sender
330330
.send(Command::GetProviders { file_name, sender })
331331
.await
332332
.expect("Command receiver not to be dropped.");
333-
receiver.collect().await
333+
let mut out = HashSet::new();
334+
while let Some(h) = receiver.next().await {
335+
out.extend(h);
336+
}
337+
out
334338
}
335339

336340
/// Request the content of the given file from the given peer.
@@ -366,7 +370,7 @@ mod network {
366370
event_sender: mpsc::Sender<Event>,
367371
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
368372
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
369-
pending_get_providers: HashMap<QueryId, mpsc::Sender<PeerId>>,
373+
pending_get_providers: HashMap<QueryId, mpsc::Sender<HashSet<PeerId>>>,
370374
pending_request_file:
371375
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
372376
}
@@ -410,7 +414,7 @@ mod network {
410414
) {
411415
match event {
412416
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
413-
KademliaEvent::OutboundQueryCompleted {
417+
KademliaEvent::OutboundQueryProgressed {
414418
id,
415419
result: QueryResult::StartProviding(_),
416420
..
@@ -425,18 +429,18 @@ mod network {
425429
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
426430
KademliaEvent::OutboundQueryProgressed {
427431
id,
428-
result: QueryProgress::GetProviders(GetProvidersProgress { provider, .. }),
432+
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
429433
..
430434
},
431435
)) => {
432436
let _ = self
433437
.pending_get_providers
434438
.get_mut(&id)
435439
.expect("Completed query to be previously pending.")
436-
.send(provider);
440+
.send(providers);
437441
}
438442
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
439-
KademliaEvent::OutboundQueryCompleted {
443+
KademliaEvent::OutboundQueryProgressed {
440444
id,
441445
result: QueryResult::GetProviders(..),
442446
..
@@ -634,7 +638,7 @@ mod network {
634638
},
635639
GetProviders {
636640
file_name: String,
637-
sender: mpsc::Sender<PeerId>,
641+
sender: mpsc::Sender<HashSet<PeerId>>,
638642
},
639643
RequestFile {
640644
file_name: String,

examples/ipfs-kad.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
8585
task::block_on(async move {
8686
loop {
8787
let event = swarm.select_next_some().await;
88-
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
88+
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
8989
result: QueryResult::GetClosestPeers(result),
9090
..
9191
}) = event

misc/metrics/src/kad.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21+
use libp2p_kad::GetProvidersOk;
2122
use prometheus_client::encoding::text::Encode;
2223
use prometheus_client::metrics::counter::Counter;
2324
use prometheus_client::metrics::family::Family;
@@ -162,7 +163,7 @@ impl Metrics {
162163
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
163164
fn record(&self, event: &libp2p_kad::KademliaEvent) {
164165
match event {
165-
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
166+
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
166167
self.query_result_num_requests
167168
.get_or_create(&result.into())
168169
.observe(stats.num_requests().into());
@@ -200,7 +201,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
200201
}
201202
},
202203
libp2p_kad::QueryResult::GetProviders(result) => match result {
203-
Ok(ok) => {}
204+
Ok(GetProvidersOk { providers, .. }) => {
205+
self.query_result_get_providers_ok
206+
.observe(providers.len() as f64);
207+
}
204208
Err(error) => {
205209
self.query_result_get_providers_error
206210
.get_or_create(&error.into())
@@ -210,11 +214,6 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
210214
_ => {}
211215
}
212216
}
213-
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, .. } => match result {
214-
libp2p_kad::QueryProgress::GetProviders(_) => {
215-
self.query_result_get_providers_ok.observe(1.);
216-
}
217-
},
218217
libp2p_kad::KademliaEvent::RoutingUpdated {
219218
is_new_peer,
220219
old_peer,

0 commit comments

Comments
 (0)