Skip to content

Commit 0ef6feb

Browse files
feat(swarm): don't have ConnectionHandlers close connections
This PR implements the long-awaited design of disallowing `ConnectionHandler`s to close entire connections. Instead, users should close connections via `ToSwarm::CloseConnection` from a `NetworkBehaviour` or - even better - from the `Swarm` via `close_connection`. A `NetworkBehaviour` also does not have a "full" view onto how a connection is used but at least it can correlate whether it created the connection via the `ConnectionId`. In general, the more modular and friendly approach is to stop "using" a connection if a particular protocol no longer needs it. As a result of the keep-alive algorithm, such a connection is then closed automatically. Depends-on: #4745. Depends-on: #4718. Depends-on: #4749. Related: #3353. Related: #4714. Resolves: #3591. Pull-Request: #4755.
1 parent e6905fe commit 0ef6feb

File tree

33 files changed

+85
-303
lines changed

33 files changed

+85
-303
lines changed

examples/file-sharing/src/network.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use async_std::io;
2-
use either::Either;
31
use futures::channel::{mpsc, oneshot};
42
use futures::prelude::*;
53

@@ -208,10 +206,7 @@ impl EventLoop {
208206
}
209207
}
210208

211-
async fn handle_event(
212-
&mut self,
213-
event: SwarmEvent<BehaviourEvent, Either<void::Void, io::Error>>,
214-
) {
209+
async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
215210
match event {
216211
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
217212
kad::Event::OutboundQueryProgressed {

misc/metrics/src/identify.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
123123
}
124124
}
125125

126-
impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
127-
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
126+
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
127+
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
128128
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
129129
peer_id,
130130
num_established,

misc/metrics/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ impl Recorder<libp2p_relay::Event> for Metrics {
138138
}
139139
}
140140

141-
impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
142-
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
141+
impl<TBvEv> Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
142+
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
143143
self.swarm.record(event);
144144

145145
#[cfg(feature = "identify")]

misc/metrics/src/swarm.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ impl Metrics {
185185
}
186186
}
187187

188-
impl<TBvEv, THandleErr> super::Recorder<SwarmEvent<TBvEv, THandleErr>> for Metrics {
189-
fn record(&self, event: &SwarmEvent<TBvEv, THandleErr>) {
188+
impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
189+
fn record(&self, event: &SwarmEvent<TBvEv>) {
190190
match event {
191191
SwarmEvent::Behaviour(_) => {}
192192
SwarmEvent::ConnectionEstablished {
@@ -359,15 +359,13 @@ struct ConnectionClosedLabels {
359359
enum ConnectionError {
360360
Io,
361361
KeepAliveTimeout,
362-
Handler,
363362
}
364363

365-
impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
366-
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
364+
impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
365+
fn from(value: &libp2p_swarm::ConnectionError) -> Self {
367366
match value {
368367
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
369368
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
370-
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
371369
}
372370
}
373371
}

protocols/dcutr/src/handler/relayed.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use std::collections::VecDeque;
4040
use std::io;
4141
use std::task::{Context, Poll};
4242
use std::time::Duration;
43-
use void::Void;
4443

4544
#[derive(Debug)]
4645
pub enum Command {
@@ -63,7 +62,6 @@ pub struct Handler {
6362
<Self as ConnectionHandler>::OutboundProtocol,
6463
<Self as ConnectionHandler>::OutboundOpenInfo,
6564
<Self as ConnectionHandler>::ToBehaviour,
66-
<Self as ConnectionHandler>::Error,
6765
>,
6866
>,
6967

@@ -182,7 +180,6 @@ impl Handler {
182180
impl ConnectionHandler for Handler {
183181
type FromBehaviour = Command;
184182
type ToBehaviour = Event;
185-
type Error = Void;
186183
type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
187184
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
188185
type OutboundOpenInfo = ();
@@ -229,12 +226,7 @@ impl ConnectionHandler for Handler {
229226
&mut self,
230227
cx: &mut Context<'_>,
231228
) -> Poll<
232-
ConnectionHandlerEvent<
233-
Self::OutboundProtocol,
234-
Self::OutboundOpenInfo,
235-
Self::ToBehaviour,
236-
Self::Error,
237-
>,
229+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
238230
> {
239231
// Return queued events.
240232
if let Some(event) = self.queued_events.pop_front() {

protocols/gossipsub/src/handler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use std::{
3838
pin::Pin,
3939
task::{Context, Poll},
4040
};
41-
use void::Void;
4241

4342
/// The event emitted by the Handler. This informs the behaviour of various events created
4443
/// by the handler.
@@ -220,7 +219,6 @@ impl EnabledHandler {
220219
<Handler as ConnectionHandler>::OutboundProtocol,
221220
<Handler as ConnectionHandler>::OutboundOpenInfo,
222221
<Handler as ConnectionHandler>::ToBehaviour,
223-
<Handler as ConnectionHandler>::Error,
224222
>,
225223
> {
226224
if !self.peer_kind_sent {
@@ -391,7 +389,6 @@ impl EnabledHandler {
391389
impl ConnectionHandler for Handler {
392390
type FromBehaviour = HandlerIn;
393391
type ToBehaviour = HandlerEvent;
394-
type Error = Void;
395392
type InboundOpenInfo = ();
396393
type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
397394
type OutboundOpenInfo = ();
@@ -434,12 +431,7 @@ impl ConnectionHandler for Handler {
434431
&mut self,
435432
cx: &mut Context<'_>,
436433
) -> Poll<
437-
ConnectionHandlerEvent<
438-
Self::OutboundProtocol,
439-
Self::OutboundOpenInfo,
440-
Self::ToBehaviour,
441-
Self::Error,
442-
>,
434+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
443435
> {
444436
match self {
445437
Handler::Enabled(handler) => handler.poll(cx),

protocols/identify/src/handler.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use libp2p_swarm::{
3838
};
3939
use smallvec::SmallVec;
4040
use std::collections::HashSet;
41-
use std::{io, task::Context, task::Poll, time::Duration};
41+
use std::{task::Context, task::Poll, time::Duration};
4242
use tracing::Level;
4343

4444
const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
@@ -57,7 +57,6 @@ pub struct Handler {
5757
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
5858
(),
5959
Event,
60-
io::Error,
6160
>; 4],
6261
>,
6362

@@ -282,7 +281,6 @@ impl Handler {
282281
impl ConnectionHandler for Handler {
283282
type FromBehaviour = InEvent;
284283
type ToBehaviour = Event;
285-
type Error = io::Error;
286284
type InboundProtocol =
287285
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
288286
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
@@ -320,9 +318,7 @@ impl ConnectionHandler for Handler {
320318
fn poll(
321319
&mut self,
322320
cx: &mut Context<'_>,
323-
) -> Poll<
324-
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
325-
> {
321+
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event>> {
326322
if let Some(event) = self.events.pop() {
327323
return Poll::Ready(event);
328324
}

protocols/kad/src/handler.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,6 @@ impl Handler {
597597
impl ConnectionHandler for Handler {
598598
type FromBehaviour = HandlerIn;
599599
type ToBehaviour = HandlerEvent;
600-
type Error = io::Error; // TODO: better error type?
601600
type InboundProtocol = Either<ProtocolConfig, upgrade::DeniedUpgrade>;
602601
type OutboundProtocol = ProtocolConfig;
603602
type OutboundOpenInfo = ();
@@ -711,12 +710,7 @@ impl ConnectionHandler for Handler {
711710
&mut self,
712711
cx: &mut Context<'_>,
713712
) -> Poll<
714-
ConnectionHandlerEvent<
715-
Self::OutboundProtocol,
716-
Self::OutboundOpenInfo,
717-
Self::ToBehaviour,
718-
Self::Error,
719-
>,
713+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
720714
> {
721715
match &mut self.protocol_status {
722716
Some(status) if !status.reported => {
@@ -846,7 +840,7 @@ impl Handler {
846840
}
847841

848842
impl futures::Stream for OutboundSubstreamState {
849-
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
843+
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;
850844

851845
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
852846
let this = self.get_mut();
@@ -978,7 +972,7 @@ impl futures::Stream for OutboundSubstreamState {
978972
}
979973

980974
impl futures::Stream for InboundSubstreamState {
981-
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
975+
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;
982976

983977
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
984978
let this = self.get_mut();

protocols/perf/src/client/handler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use libp2p_swarm::{
3535
},
3636
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
3737
};
38-
use void::Void;
3938

4039
use crate::client::{RunError, RunId};
4140
use crate::{RunParams, RunUpdate};
@@ -59,7 +58,6 @@ pub struct Handler {
5958
<Self as ConnectionHandler>::OutboundProtocol,
6059
<Self as ConnectionHandler>::OutboundOpenInfo,
6160
<Self as ConnectionHandler>::ToBehaviour,
62-
<Self as ConnectionHandler>::Error,
6361
>,
6462
>,
6563

@@ -87,7 +85,6 @@ impl Default for Handler {
8785
impl ConnectionHandler for Handler {
8886
type FromBehaviour = Command;
8987
type ToBehaviour = Event;
90-
type Error = Void;
9188
type InboundProtocol = DeniedUpgrade;
9289
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
9390
type OutboundOpenInfo = ();
@@ -159,12 +156,7 @@ impl ConnectionHandler for Handler {
159156
&mut self,
160157
cx: &mut Context<'_>,
161158
) -> Poll<
162-
ConnectionHandlerEvent<
163-
Self::OutboundProtocol,
164-
Self::OutboundOpenInfo,
165-
Self::ToBehaviour,
166-
Self::Error,
167-
>,
159+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
168160
> {
169161
if let Some(event) = self.queued_events.pop_front() {
170162
return Poll::Ready(event);

protocols/perf/src/server/handler.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ impl Default for Handler {
6363
impl ConnectionHandler for Handler {
6464
type FromBehaviour = Void;
6565
type ToBehaviour = Event;
66-
type Error = Void;
6766
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
6867
type OutboundProtocol = DeniedUpgrade;
6968
type OutboundOpenInfo = Void;
@@ -121,12 +120,7 @@ impl ConnectionHandler for Handler {
121120
&mut self,
122121
cx: &mut Context<'_>,
123122
) -> Poll<
124-
ConnectionHandlerEvent<
125-
Self::OutboundProtocol,
126-
Self::OutboundOpenInfo,
127-
Self::ToBehaviour,
128-
Self::Error,
129-
>,
123+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
130124
> {
131125
loop {
132126
match self.inbound.poll_unpin(cx) {

protocols/ping/src/handler.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ impl Handler {
209209
impl ConnectionHandler for Handler {
210210
type FromBehaviour = Void;
211211
type ToBehaviour = Result<Duration, Failure>;
212-
type Error = Void;
213212
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
214213
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
215214
type OutboundOpenInfo = ();
@@ -225,14 +224,8 @@ impl ConnectionHandler for Handler {
225224
fn poll(
226225
&mut self,
227226
cx: &mut Context<'_>,
228-
) -> Poll<
229-
ConnectionHandlerEvent<
230-
ReadyUpgrade<StreamProtocol>,
231-
(),
232-
Result<Duration, Failure>,
233-
Self::Error,
234-
>,
235-
> {
227+
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
228+
{
236229
match self.state {
237230
State::Inactive { reported: true } => {
238231
return Poll::Pending; // nothing to do on this connection

protocols/relay/src/behaviour/handler.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,6 @@ pub struct Handler {
339339
<Self as ConnectionHandler>::OutboundProtocol,
340340
<Self as ConnectionHandler>::OutboundOpenInfo,
341341
<Self as ConnectionHandler>::ToBehaviour,
342-
<Self as ConnectionHandler>::Error,
343342
>,
344343
>,
345344

@@ -482,7 +481,6 @@ type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
482481
impl ConnectionHandler for Handler {
483482
type FromBehaviour = In;
484483
type ToBehaviour = Event;
485-
type Error = void::Void;
486484
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
487485
type InboundOpenInfo = ();
488486
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
@@ -593,12 +591,7 @@ impl ConnectionHandler for Handler {
593591
&mut self,
594592
cx: &mut Context<'_>,
595593
) -> Poll<
596-
ConnectionHandlerEvent<
597-
Self::OutboundProtocol,
598-
Self::OutboundOpenInfo,
599-
Self::ToBehaviour,
600-
Self::Error,
601-
>,
594+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
602595
> {
603596
// Return queued events.
604597
if let Some(event) = self.queued_events.pop_front() {

protocols/relay/src/priv_client/handler.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ pub struct Handler {
101101
<Handler as ConnectionHandler>::OutboundProtocol,
102102
<Handler as ConnectionHandler>::OutboundOpenInfo,
103103
<Handler as ConnectionHandler>::ToBehaviour,
104-
<Handler as ConnectionHandler>::Error,
105104
>,
106105
>,
107106

@@ -230,7 +229,6 @@ impl Handler {
230229
impl ConnectionHandler for Handler {
231230
type FromBehaviour = In;
232231
type ToBehaviour = Event;
233-
type Error = void::Void;
234232
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
235233
type InboundOpenInfo = ();
236234
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
@@ -275,12 +273,7 @@ impl ConnectionHandler for Handler {
275273
&mut self,
276274
cx: &mut Context<'_>,
277275
) -> Poll<
278-
ConnectionHandlerEvent<
279-
Self::OutboundProtocol,
280-
Self::OutboundOpenInfo,
281-
Self::ToBehaviour,
282-
Self::Error,
283-
>,
276+
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
284277
> {
285278
loop {
286279
debug_assert_eq!(

protocols/request-response/src/handler.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,6 @@ where
367367
{
368368
type FromBehaviour = OutboundMessage<TCodec>;
369369
type ToBehaviour = Event<TCodec>;
370-
type Error = void::Void;
371370
type InboundProtocol = Protocol<TCodec::Protocol>;
372371
type OutboundProtocol = Protocol<TCodec::Protocol>;
373372
type OutboundOpenInfo = ();
@@ -390,8 +389,7 @@ where
390389
fn poll(
391390
&mut self,
392391
cx: &mut Context<'_>,
393-
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour, Self::Error>>
394-
{
392+
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour>> {
395393
match self.worker_streams.poll_unpin(cx) {
396394
Poll::Ready((_, Ok(Ok(event)))) => {
397395
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));

0 commit comments

Comments
 (0)