-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(gossipsub): gracefully disable handler on stream errors #3625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
1264345
3c2fbce
b8fed53
f4cfbc3
e7e96ed
e37ba58
b6be9ce
1e06367
415f648
f87949d
9e12f9d
ee6cb02
3443a69
fef9751
7dec223
3163213
e28af53
0507493
b572895
12e9b53
fd4958d
44dce05
6a5f1d0
3432ac0
db59d23
b94ec28
c5e3c41
c02a3a3
e94c2c7
a7ed378
798ef5c
bbdf8f5
af21589
f999f3e
397afa2
9f44adc
b01e86f
552cb08
b42e71e
a958b60
d673ed2
7cb4e41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,6 @@ use libp2p_swarm::NegotiatedSubstream; | |
use log::{error, trace, warn}; | ||
use smallvec::SmallVec; | ||
use std::{ | ||
collections::VecDeque, | ||
io, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
time::Duration, | ||
|
@@ -124,9 +122,6 @@ pub struct Handler { | |
/// The amount of time we allow idle connections before disconnecting. | ||
idle_timeout: Duration, | ||
|
||
/// Collection of errors from attempting an upgrade. | ||
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>, | ||
|
||
/// Flag determining whether to maintain the connection to the peer. | ||
keep_alive: KeepAlive, | ||
|
||
|
@@ -174,7 +169,6 @@ impl Handler { | |
peer_kind_sent: false, | ||
protocol_unsupported: false, | ||
idle_timeout, | ||
upgrade_errors: VecDeque::new(), | ||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), | ||
in_mesh: false, | ||
} | ||
|
@@ -187,6 +181,13 @@ impl Handler { | |
<Self as ConnectionHandler>::InboundOpenInfo, | ||
>, | ||
) { | ||
if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { | ||
// Too many inbound substreams have been created, end the connection. | ||
self.keep_alive = KeepAlive::No; | ||
log::info!("Gossipsub error: The maximum number of inbound substreams created has been exceeded."); | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return; | ||
} | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let (substream, peer_kind) = protocol; | ||
|
||
// If the peer doesn't support the protocol, reject all substreams | ||
|
@@ -216,6 +217,12 @@ impl Handler { | |
<Self as ConnectionHandler>::OutboundOpenInfo, | ||
>, | ||
) { | ||
if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { | ||
self.keep_alive = KeepAlive::No; | ||
log::info!("Gossipsub error: The maximum number of outbound substreams created has been exceeded"); | ||
return; | ||
} | ||
|
||
let (substream, peer_kind) = protocol; | ||
|
||
// If the peer doesn't support the protocol, reject all substreams | ||
|
@@ -289,44 +296,15 @@ impl ConnectionHandler for Handler { | |
Self::Error, | ||
>, | ||
> { | ||
// Handle any upgrade errors | ||
if let Some(error) = self.upgrade_errors.pop_front() { | ||
let reported_error = match error { | ||
// Timeout errors get mapped to NegotiationTimeout and we close the connection. | ||
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { | ||
Some(HandlerError::NegotiationTimeout) | ||
} | ||
// There was an error post negotiation, close the connection. | ||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), | ||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { | ||
match negotiation_error { | ||
NegotiationError::Failed => { | ||
// The protocol is not supported | ||
self.protocol_unsupported = true; | ||
if !self.peer_kind_sent { | ||
self.peer_kind_sent = true; | ||
// clear all substreams so the keep alive returns false | ||
self.inbound_substream = None; | ||
self.outbound_substream = None; | ||
self.keep_alive = KeepAlive::No; | ||
return Poll::Ready(ConnectionHandlerEvent::Custom( | ||
HandlerEvent::PeerKind(PeerKind::NotSupported), | ||
)); | ||
} else { | ||
None | ||
} | ||
} | ||
NegotiationError::ProtocolError(e) => { | ||
Some(HandlerError::NegotiationProtocolError(e)) | ||
} | ||
} | ||
} | ||
}; | ||
|
||
// If there was a fatal error, close the connection. | ||
if let Some(error) = reported_error { | ||
return Poll::Ready(ConnectionHandlerEvent::Close(error)); | ||
} | ||
if self.protocol_unsupported && !self.peer_kind_sent { | ||
self.peer_kind_sent = true; | ||
// clear all substreams so the keep alive returns false | ||
self.inbound_substream = None; | ||
self.outbound_substream = None; | ||
self.keep_alive = KeepAlive::No; | ||
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( | ||
PeerKind::NotSupported, | ||
))); | ||
} | ||
|
||
if !self.peer_kind_sent { | ||
|
@@ -338,23 +316,14 @@ impl ConnectionHandler for Handler { | |
} | ||
} | ||
|
||
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { | ||
// Too many inbound substreams have been created, end the connection. | ||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||
HandlerError::MaxInboundSubstreams, | ||
)); | ||
} | ||
// Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking a closer look, I don't think this invariant actually holds. In if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, disable the handler.
self.keep_alive = KeepAlive::No;
log::warn!("The maximum number of inbound substreams created has been exceeded");
return;
} Thus @thomaseizinger am I missing something? |
||
|
||
// determine if we need to create the stream | ||
if !self.send_queue.is_empty() | ||
&& self.outbound_substream.is_none() | ||
&& !self.outbound_substream_establishing | ||
{ | ||
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { | ||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||
HandlerError::MaxOutboundSubstreams, | ||
)); | ||
} | ||
// Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`. | ||
let message = self.send_queue.remove(0); | ||
self.send_queue.shrink_to_fit(); | ||
self.outbound_substream_establishing = true; | ||
|
@@ -475,14 +444,17 @@ impl ConnectionHandler for Handler { | |
Some(OutboundSubstreamState::WaitingOutput(substream)); | ||
} | ||
Err(e) => { | ||
error!("Error sending message: {}", e); | ||
return Poll::Ready(ConnectionHandlerEvent::Close(e)); | ||
log::debug!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logs for errors on outbound substreams have all been downgraded to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me, a |
||
"Outbound substream error while sending output: {e}" | ||
); | ||
self.outbound_substream = None; | ||
break; | ||
} | ||
} | ||
} | ||
Poll::Ready(Err(e)) => { | ||
error!("Outbound substream error while sending output: {:?}", e); | ||
return Poll::Ready(ConnectionHandlerEvent::Close(e)); | ||
error!("Outbound substream error while sending output: {e:?}"); | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break; | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
Poll::Pending => { | ||
self.keep_alive = KeepAlive::Yes; | ||
|
@@ -504,7 +476,8 @@ impl ConnectionHandler for Handler { | |
Some(OutboundSubstreamState::WaitingOutput(substream)) | ||
} | ||
Poll::Ready(Err(e)) => { | ||
return Poll::Ready(ConnectionHandlerEvent::Close(e)) | ||
log::debug!("Outbound substream error while flushing output: {e}"); | ||
break; | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
Poll::Pending => { | ||
self.keep_alive = KeepAlive::Yes; | ||
|
@@ -525,14 +498,8 @@ impl ConnectionHandler for Handler { | |
break; | ||
} | ||
Poll::Ready(Err(e)) => { | ||
warn!("Outbound substream error while closing: {:?}", e); | ||
return Poll::Ready(ConnectionHandlerEvent::Close( | ||
io::Error::new( | ||
io::ErrorKind::BrokenPipe, | ||
"Failed to close outbound substream", | ||
) | ||
.into(), | ||
)); | ||
warn!("Outbound substream error while closing: {e:?}"); | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break; | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
Poll::Pending => { | ||
self.keep_alive = KeepAlive::No; | ||
|
@@ -571,10 +538,32 @@ impl ConnectionHandler for Handler { | |
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { | ||
self.on_fully_negotiated_outbound(fully_negotiated_outbound) | ||
} | ||
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { | ||
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { | ||
self.outbound_substream_establishing = false; | ||
warn!("Dial upgrade error {:?}", e); | ||
self.upgrade_errors.push_back(e); | ||
|
||
match error { | ||
// Timeout errors get mapped to NegotiationTimeout and we close the connection. | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { | ||
self.keep_alive = KeepAlive::No; | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log::info!("Dial upgrade error: Protocol negotiation timeout."); | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
// There was an error post negotiation, close the connection. | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log::info!("Dial upgrade error: {e}"); | ||
} | ||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { | ||
match negotiation_error { | ||
NegotiationError::Failed => { | ||
// The protocol is not supported | ||
self.protocol_unsupported = true; | ||
log::info!("Dial upgrade error: {}", NegotiationError::Failed); | ||
} | ||
NegotiationError::ProtocolError(e) => { | ||
log::info!("Gossipsub error: Protocol negotiation failed. {e}"); | ||
} | ||
} | ||
thomaseizinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.