Skip to content

Commit 4b0dc43

Browse files
committed
First try at fixing: Store the future.
Due to lifetime issues I wasn't able to solve yet this moves both the socket and the buffer into the future, which returns them when ready.
1 parent 6ff7aef commit 4b0dc43

File tree

2 files changed

+38
-19
lines changed

2 files changed

+38
-19
lines changed

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ edition = "2018"
77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

99
[dependencies]
10-
async-std = { version = "=1.6.2", features = ["attributes"] }
10+
async-std = { version = "=1.9", features = ["attributes"] }
1111
futures = "0.3.12"

Diff for: src/main.rs

+37-18
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use async_std::net::UdpSocket;
22
use async_std::stream::{Stream, StreamExt};
33
use async_std::task;
44
use futures::future::Future;
5-
use futures::pin_mut;
5+
use futures::{pin_mut, ready};
66
use std::io::Result;
77
use std::net::SocketAddr;
88
use std::pin::Pin;
@@ -54,37 +54,56 @@ async fn send_loop(bind_addr: &str, peer_addr: &str) -> Result<()> {
5454
}
5555
}
5656

57+
type ReceiveResult = Result<(usize, SocketAddr)>;
58+
5759
struct UdpStream {
58-
socket: UdpSocket,
59-
buf: Vec<u8>,
60+
inner: Option<(UdpSocket, Vec<u8>)>,
61+
fut: Option<Pin<Box<dyn Future<Output = (UdpSocket, Vec<u8>, ReceiveResult)> + Send + Sync>>>,
6062
}
6163

6264
impl UdpStream {
6365
pub fn new(socket: UdpSocket) -> Self {
66+
let buf = vec![0u8; 1024];
6467
Self {
65-
socket,
66-
buf: vec![0u8; 1024],
68+
fut: None,
69+
inner: Some((socket, buf)),
6770
}
6871
}
6972
}
70-
7173
impl Stream for UdpStream {
7274
type Item = Result<(Vec<u8>, SocketAddr)>;
73-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74-
let this = self.get_mut();
75-
let res = {
76-
let fut = this.socket.recv_from(&mut this.buf);
77-
pin_mut!(fut);
78-
fut.poll(cx)
75+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76+
let mut fut = if let Some(fut) = self.fut.take() {
77+
fut
78+
} else {
79+
if let Some((socket, buf)) = self.inner.take() {
80+
let fut = recv_next(socket, buf);
81+
Box::pin(fut)
82+
} else {
83+
unreachable!()
84+
}
7985
};
86+
87+
let res = Pin::new(&mut fut).poll(cx);
88+
8089
match res {
81-
Poll::Pending => Poll::Pending,
82-
Poll::Ready(Ok(res)) => {
83-
let buf = this.buf[..res.0].to_vec();
84-
let peer = res.1;
85-
Poll::Ready(Some(Ok((buf, peer))))
90+
Poll::Pending => {
91+
self.fut = Some(fut);
92+
Poll::Pending
8693
}
87-
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
94+
Poll::Ready((socket, buf, res)) => match res {
95+
Ok((n, peer_addr)) => {
96+
let vec = buf[..n].to_vec();
97+
self.inner = Some((socket, buf));
98+
Poll::Ready(Some(Ok((vec, peer_addr))))
99+
}
100+
Err(e) => Poll::Ready(Some(Err(e))),
101+
},
88102
}
89103
}
90104
}
105+
106+
async fn recv_next(socket: UdpSocket, mut buf: Vec<u8>) -> (UdpSocket, Vec<u8>, ReceiveResult) {
107+
let res = socket.recv_from(&mut buf).await;
108+
(socket, buf, res)
109+
}

0 commit comments

Comments
 (0)