Skip to content

Commit 85e3025

Browse files
thomaseizingeralindima
authored andcommitted
feat(request-response): don't close connection on stream errors
Related: libp2p#3591. Pull-Request: libp2p#3913.
1 parent 2d9ae38 commit 85e3025

File tree

4 files changed

+15
-36
lines changed

4 files changed

+15
-36
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/file-sharing/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ either = "1.8"
1313
env_logger = "0.10"
1414
futures = "0.3.28"
1515
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "kad", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] }
16-
multiaddr = { version = "0.17.1" }
16+
multiaddr = { version = "0.17.1" }
17+
void = "1.0.2"

protocols/request-response/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ libp2p-swarm = { version = "0.42.1", path = "../../swarm" }
1919
libp2p-identity = { version = "0.1.0", path = "../../identity" }
2020
rand = "0.8"
2121
smallvec = "1.6.1"
22+
void = "1.0.2"
23+
log = "0.4.17"
2224

2325
[dev-dependencies]
2426
async-std = { version = "1.6.2", features = ["attributes"] }

protocols/request-response/src/handler_priv.rs

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use libp2p_swarm::{
3939
use smallvec::SmallVec;
4040
use std::{
4141
collections::VecDeque,
42-
fmt, io,
42+
fmt,
4343
sync::{
4444
atomic::{AtomicU64, Ordering},
4545
Arc,
@@ -71,8 +71,6 @@ where
7171
substream_timeout: Duration,
7272
/// The current connection keep-alive.
7373
keep_alive: KeepAlive,
74-
/// A pending fatal error that results in the connection being closed.
75-
pending_error: Option<ConnectionHandlerUpgrErr<io::Error>>,
7674
/// Queue of events to emit in `poll()`.
7775
pending_events: VecDeque<Event<TCodec>>,
7876
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
@@ -113,7 +111,6 @@ where
113111
outbound: VecDeque::new(),
114112
inbound: FuturesUnordered::new(),
115113
pending_events: VecDeque::new(),
116-
pending_error: None,
117114
inbound_request_id,
118115
}
119116
}
@@ -156,40 +153,22 @@ where
156153
// the remote peer does not support the requested protocol(s).
157154
self.pending_events
158155
.push_back(Event::OutboundUnsupportedProtocols(info));
156+
log::debug!("outbound stream {info} failed: Failed negotiation");
159157
}
160-
_ => {
161-
// Anything else is considered a fatal error or misbehaviour of
162-
// the remote peer and results in closing the connection.
163-
self.pending_error = Some(error);
158+
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
159+
log::debug!("outbound stream {info} failed: {e}");
164160
}
161+
_ => {}
165162
}
166163
}
167164
fn on_listen_upgrade_error(
168165
&mut self,
169-
ListenUpgradeError { info, error }: ListenUpgradeError<
166+
ListenUpgradeError { error, info }: ListenUpgradeError<
170167
<Self as ConnectionHandler>::InboundOpenInfo,
171168
<Self as ConnectionHandler>::InboundProtocol,
172169
>,
173170
) {
174-
match error {
175-
ConnectionHandlerUpgrErr::Timeout => {
176-
self.pending_events.push_back(Event::InboundTimeout(info))
177-
}
178-
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
179-
// The local peer merely doesn't support the protocol(s) requested.
180-
// This is no reason to close the connection, which may
181-
// successfully communicate with other protocols already.
182-
// An event is reported to permit user code to react to the fact that
183-
// the local peer does not support the requested protocol(s).
184-
self.pending_events
185-
.push_back(Event::InboundUnsupportedProtocols(info));
186-
}
187-
_ => {
188-
// Anything else is considered a fatal error or misbehaviour of
189-
// the remote peer and results in closing the connection.
190-
self.pending_error = Some(error);
191-
}
192-
}
171+
log::debug!("inbound stream {info} failed: {error}");
193172
}
194173
}
195174

@@ -284,7 +263,7 @@ where
284263
{
285264
type InEvent = RequestProtocol<TCodec>;
286265
type OutEvent = Event<TCodec>;
287-
type Error = ConnectionHandlerUpgrErr<io::Error>;
266+
type Error = void::Void;
288267
type InboundProtocol = ResponseProtocol<TCodec>;
289268
type OutboundProtocol = RequestProtocol<TCodec>;
290269
type OutboundOpenInfo = RequestId;
@@ -338,12 +317,6 @@ where
338317
cx: &mut Context<'_>,
339318
) -> Poll<ConnectionHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::OutEvent, Self::Error>>
340319
{
341-
// Check for a pending (fatal) error.
342-
if let Some(err) = self.pending_error.take() {
343-
// The handler will not be polled again by the `Swarm`.
344-
return Poll::Ready(ConnectionHandlerEvent::Close(err));
345-
}
346-
347320
// Drain pending events.
348321
if let Some(event) = self.pending_events.pop_front() {
349322
return Poll::Ready(ConnectionHandlerEvent::Custom(event));

0 commit comments

Comments
 (0)