1
+ use crate :: unfold_state:: UnfoldState ;
1
2
use core:: fmt;
2
3
use core:: pin:: Pin ;
3
4
use futures_core:: future:: Future ;
@@ -8,20 +9,15 @@ use futures_core::task::{Context, Poll};
8
9
use futures_sink:: Sink ;
9
10
use pin_project_lite:: pin_project;
10
11
11
- struct StateFn < S , F > {
12
- state : S ,
13
- f : F ,
14
- }
15
-
16
12
pin_project ! {
17
13
/// Stream for the [`scan`](super::StreamExt::scan) method.
18
14
#[ must_use = "streams do nothing unless polled" ]
19
15
pub struct Scan <St : Stream , S , Fut , F > {
20
16
#[ pin]
21
17
stream: St ,
22
- state_f : Option < StateFn < S , F >> ,
18
+ f : F ,
23
19
#[ pin]
24
- future : Option < Fut >,
20
+ state : UnfoldState < S , Fut >,
25
21
}
26
22
}
27
23
35
31
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
36
32
f. debug_struct ( "Scan" )
37
33
. field ( "stream" , & self . stream )
38
- . field ( "state" , & self . state_f . as_ref ( ) . map ( |s| & s. state ) )
39
- . field ( "future" , & self . future )
34
+ . field ( "state" , & self . state )
40
35
. field ( "done_taking" , & self . is_done_taking ( ) )
41
36
. finish ( )
42
37
}
@@ -45,18 +40,18 @@ where
45
40
impl < St : Stream , S , Fut , F > Scan < St , S , Fut , F > {
46
41
/// Checks if internal state is `None`.
47
42
fn is_done_taking ( & self ) -> bool {
48
- self . state_f . is_none ( )
43
+ self . state . is_empty ( )
49
44
}
50
45
}
51
46
52
47
impl < B , St , S , Fut , F > Scan < St , S , Fut , F >
53
48
where
54
49
St : Stream ,
55
- F : FnMut ( & mut S , St :: Item ) -> Fut ,
56
- Fut : Future < Output = Option < B > > ,
50
+ F : FnMut ( S , St :: Item ) -> Fut ,
51
+ Fut : Future < Output = Option < ( S , B ) > > ,
57
52
{
58
53
pub ( super ) fn new ( stream : St , initial_state : S , f : F ) -> Self {
59
- Self { stream, state_f : Some ( StateFn { state : initial_state, f } ) , future : None }
54
+ Self { stream, f , state : UnfoldState :: Value { value : initial_state } }
60
55
}
61
56
62
57
delegate_access_inner ! ( stream, St , ( ) ) ;
65
60
impl < B , St , S , Fut , F > Stream for Scan < St , S , Fut , F >
66
61
where
67
62
St : Stream ,
68
- F : FnMut ( & mut S , St :: Item ) -> Fut ,
69
- Fut : Future < Output = Option < B > > ,
63
+ F : FnMut ( S , St :: Item ) -> Fut ,
64
+ Fut : Future < Output = Option < ( S , B ) > > ,
70
65
{
71
66
type Item = B ;
72
67
@@ -78,18 +73,20 @@ where
78
73
let mut this = self . project ( ) ;
79
74
80
75
Poll :: Ready ( loop {
81
- if let Some ( fut) = this. future . as_mut ( ) . as_pin_mut ( ) {
82
- let item = ready ! ( fut. poll( cx) ) ;
83
- this. future . set ( None ) ;
84
-
85
- if item. is_none ( ) {
86
- * this. state_f = None ;
76
+ if let Some ( fut) = this. state . as_mut ( ) . project_future ( ) {
77
+ match ready ! ( fut. poll( cx) ) {
78
+ None => {
79
+ this. state . set ( UnfoldState :: Empty ) ;
80
+ break None ;
81
+ }
82
+ Some ( ( state, item) ) => {
83
+ this. state . set ( UnfoldState :: Value { value : state } ) ;
84
+ break Some ( item) ;
85
+ }
87
86
}
88
-
89
- break item;
90
87
} else if let Some ( item) = ready ! ( this. stream. as_mut( ) . poll_next( cx) ) {
91
- let state_f = this. state_f . as_mut ( ) . unwrap ( ) ;
92
- this. future . set ( Some ( ( state_f . f ) ( & mut state_f . state , item) ) )
88
+ let state = this. state . as_mut ( ) . take_value ( ) . unwrap ( ) ;
89
+ this. state . set ( UnfoldState :: Future { future : ( this . f ) ( state, item) } )
93
90
} else {
94
91
break None ;
95
92
}
@@ -108,11 +105,11 @@ where
108
105
impl < B , St , S , Fut , F > FusedStream for Scan < St , S , Fut , F >
109
106
where
110
107
St : FusedStream ,
111
- F : FnMut ( & mut S , St :: Item ) -> Fut ,
112
- Fut : Future < Output = Option < B > > ,
108
+ F : FnMut ( S , St :: Item ) -> Fut ,
109
+ Fut : Future < Output = Option < ( S , B ) > > ,
113
110
{
114
111
fn is_terminated ( & self ) -> bool {
115
- self . is_done_taking ( ) || self . future . is_none ( ) && self . stream . is_terminated ( )
112
+ self . is_done_taking ( ) || ! self . state . is_future ( ) && self . stream . is_terminated ( )
116
113
}
117
114
}
118
115
0 commit comments