Skip to content

protocols/kad/: Replace manual procedural state machines with async/await #3130

Closed
@mxinden

Description

@mxinden

Reading and writing on inbound/outbound streams in libp2p-kad is implemented via hand written procedural / sequential state machines. This logic can be simplified by using Rust's async-await.

Manual state machines in libp2p-kad:

/// State of an active outbound substream.
enum OutboundSubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer.
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>),
/// Waiting to send a message to the remote.
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg,
Option<TUserData>,
),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
/// Waiting for an answer back from the remote.
// TODO: add timeout
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
}

Example of using async-await in libp2p-dcutr:

pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
let msg = StopMessage {
r#type: stop_message::Type::Status.into(),
peer: None,
limit: None,
status: Some(Status::Ok.into()),
};
self.send(msg).await?;
let FramedParts {
io,
read_buffer,
write_buffer,
..
} = self.substream.into_parts();
assert!(
write_buffer.is_empty(),
"Expect a flushed Framed to have an empty write buffer."
);
Ok((io, read_buffer.freeze()))
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions