@@ -386,20 +386,20 @@ impl<T> Receiver<T> {
386
386
/// s.send(2).await;
387
387
/// });
388
388
///
389
- /// assert_eq!(r.recv().await, Some (1));
390
- /// assert_eq!(r.recv().await, Some (2));
391
- /// assert_eq !(r.recv().await, None );
389
+ /// assert_eq!(r.recv().await, Ok (1));
390
+ /// assert_eq!(r.recv().await, Ok (2));
391
+ /// assert !(r.recv().await.is_err() );
392
392
/// #
393
393
/// # })
394
394
/// ```
395
- pub async fn recv ( & self ) -> Option < T > {
395
+ pub async fn recv ( & self ) -> Result < T , RecvError > {
396
396
struct RecvFuture < ' a , T > {
397
397
channel : & ' a Channel < T > ,
398
398
opt_key : Option < usize > ,
399
399
}
400
400
401
401
impl < T > Future for RecvFuture < ' _ , T > {
402
- type Output = Option < T > ;
402
+ type Output = Result < T , RecvError > ;
403
403
404
404
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
405
405
poll_recv (
@@ -565,12 +565,13 @@ impl<T> Stream for Receiver<T> {
565
565
566
566
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
567
567
let this = & mut * self ;
568
- poll_recv (
568
+ let res = futures_core :: ready! ( poll_recv(
569
569
& this. channel,
570
570
& this. channel. stream_wakers,
571
571
& mut this. opt_key,
572
572
cx,
573
- )
573
+ ) ) ;
574
+ Poll :: Ready ( res. ok ( ) )
574
575
}
575
576
}
576
577
@@ -589,7 +590,7 @@ fn poll_recv<T>(
589
590
wakers : & WakerSet ,
590
591
opt_key : & mut Option < usize > ,
591
592
cx : & mut Context < ' _ > ,
592
- ) -> Poll < Option < T > > {
593
+ ) -> Poll < Result < T , RecvError > > {
593
594
loop {
594
595
// If the current task is in the set, remove it.
595
596
if let Some ( key) = opt_key. take ( ) {
@@ -598,8 +599,8 @@ fn poll_recv<T>(
598
599
599
600
// Try receiving a message.
600
601
match channel. try_recv ( ) {
601
- Ok ( msg) => return Poll :: Ready ( Some ( msg) ) ,
602
- Err ( TryRecvError :: Disconnected ) => return Poll :: Ready ( None ) ,
602
+ Ok ( msg) => return Poll :: Ready ( Ok ( msg) ) ,
603
+ Err ( TryRecvError :: Disconnected ) => return Poll :: Ready ( Err ( RecvError { } ) ) ,
603
604
Err ( TryRecvError :: Empty ) => {
604
605
// Insert this receive operation.
605
606
* opt_key = Some ( wakers. insert ( cx) ) ;
@@ -1031,3 +1032,17 @@ impl Display for TryRecvError {
1031
1032
}
1032
1033
}
1033
1034
}
1035
+
1036
+ /// An error returned from the `recv` method.
1037
+ #[ cfg( feature = "unstable" ) ]
1038
+ #[ cfg_attr( feature = "docs" , doc( cfg( unstable) ) ) ]
1039
+ #[ derive( Debug ) ]
1040
+ pub struct RecvError ;
1041
+
1042
+ impl Error for RecvError { }
1043
+
1044
+ impl Display for RecvError {
1045
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
1046
+ Display :: fmt ( "The channel is empty." , f)
1047
+ }
1048
+ }
0 commit comments