1
- use core:: mem:: ManuallyDrop ;
2
1
use core:: pin:: Pin ;
3
2
3
+ use futures_core:: ready;
4
+ use pin_project_lite:: pin_project;
5
+
4
6
use crate :: stream:: Stream ;
5
7
use crate :: task:: { Context , Poll } ;
6
8
7
- /// A stream that will repeatedly yield the same list of elements.
8
- #[ derive( Debug ) ]
9
- pub struct Cycle < S > {
10
- orig : S ,
11
- source : ManuallyDrop < S > ,
9
+ pin_project ! {
10
+ /// A stream that will repeatedly yield the same list of elements.
11
+ #[ derive( Debug ) ]
12
+ pub struct Cycle <S > {
13
+ orig: S ,
14
+ #[ pin]
15
+ source: S ,
16
+ }
12
17
}
13
18
14
19
impl < S > Cycle < S >
18
23
pub ( crate ) fn new ( source : S ) -> Self {
19
24
Self {
20
25
orig : source. clone ( ) ,
21
- source : ManuallyDrop :: new ( source) ,
22
- }
23
- }
24
- }
25
-
26
- impl < S > Drop for Cycle < S > {
27
- fn drop ( & mut self ) {
28
- unsafe {
29
- ManuallyDrop :: drop ( & mut self . source ) ;
26
+ source,
30
27
}
31
28
}
32
29
}
@@ -38,17 +35,14 @@ where
38
35
type Item = S :: Item ;
39
36
40
37
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
41
- unsafe {
42
- let this = self . get_unchecked_mut ( ) ;
38
+ let mut this = self . project ( ) ;
43
39
44
- match futures_core:: ready!( Pin :: new_unchecked( & mut * this. source) . poll_next( cx) ) {
45
- Some ( item) => Poll :: Ready ( Some ( item) ) ,
46
- None => {
47
- ManuallyDrop :: drop ( & mut this. source ) ;
48
- this. source = ManuallyDrop :: new ( this. orig . clone ( ) ) ;
49
- Pin :: new_unchecked ( & mut * this. source ) . poll_next ( cx)
50
- }
40
+ match ready ! ( this. source. as_mut( ) . poll_next( cx) ) {
41
+ None => {
42
+ this. source . set ( this. orig . clone ( ) ) ;
43
+ this. source . poll_next ( cx)
51
44
}
45
+ item => Poll :: Ready ( item) ,
52
46
}
53
47
}
54
48
}
0 commit comments