File tree 3 files changed +44
-5
lines changed
3 files changed +44
-5
lines changed Original file line number Diff line number Diff line change @@ -62,9 +62,15 @@ impl Connection for SqliteConnection {
62
62
63
63
type Options = SqliteConnectOptions ;
64
64
65
- fn close ( self ) -> BoxFuture < ' static , Result < ( ) , Error > > {
66
- // nothing explicit to do; connection will close in drop
67
- Box :: pin ( future:: ok ( ( ) ) )
65
+ fn close ( mut self ) -> BoxFuture < ' static , Result < ( ) , Error > > {
66
+ Box :: pin ( async move {
67
+ let shutdown = self . worker . shutdown ( ) ;
68
+ // Drop the statement worker and any outstanding statements, which should
69
+ // cover all references to the connection handle outside of the worker thread
70
+ drop ( self ) ;
71
+ // Ensure the worker thread has terminated
72
+ shutdown. await
73
+ } )
68
74
}
69
75
70
76
fn ping ( & mut self ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
Original file line number Diff line number Diff line change @@ -30,6 +30,9 @@ enum StatementWorkerCommand {
30
30
statement : Weak < StatementHandle > ,
31
31
tx : oneshot:: Sender < ( ) > ,
32
32
} ,
33
+ Shutdown {
34
+ tx : oneshot:: Sender < ( ) > ,
35
+ } ,
33
36
}
34
37
35
38
impl StatementWorker {
@@ -72,6 +75,13 @@ impl StatementWorker {
72
75
let _ = tx. send ( ( ) ) ;
73
76
}
74
77
}
78
+ StatementWorkerCommand :: Shutdown { tx } => {
79
+ // drop the connection reference before sending confirmation
80
+ // and ending the command loop
81
+ drop ( conn) ;
82
+ let _ = tx. send ( ( ) ) ;
83
+ return ;
84
+ }
75
85
}
76
86
}
77
87
@@ -127,4 +137,25 @@ impl StatementWorker {
127
137
rx. await . map_err ( |_| Error :: WorkerCrashed )
128
138
}
129
139
}
140
+
141
+ /// Send a command to the worker to shut down the processing thread.
142
+ ///
143
+ /// A `WorkerCrashed` error may be returned if the thread has already stopped.
144
+ /// Subsequent calls to `step()`, `reset()`, or this method will fail with
145
+ /// `WorkerCrashed`. Ensure that any associated statements are dropped first.
146
+ pub ( crate ) fn shutdown ( & mut self ) -> impl Future < Output = Result < ( ) , Error > > {
147
+ let ( tx, rx) = oneshot:: channel ( ) ;
148
+
149
+ let send_res = self
150
+ . tx
151
+ . send ( StatementWorkerCommand :: Shutdown { tx } )
152
+ . map_err ( |_| Error :: WorkerCrashed ) ;
153
+
154
+ async move {
155
+ send_res?;
156
+
157
+ // wait for the response
158
+ rx. await . map_err ( |_| Error :: WorkerCrashed )
159
+ }
160
+ }
130
161
}
Original file line number Diff line number Diff line change @@ -206,7 +206,8 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
206
206
async fn it_opens_in_memory ( ) -> anyhow:: Result < ( ) > {
207
207
// If the filename is ":memory:", then a private, temporary in-memory database
208
208
// is created for the connection.
209
- let _ = SqliteConnection :: connect ( ":memory:" ) . await ?;
209
+ let conn = SqliteConnection :: connect ( ":memory:" ) . await ?;
210
+ conn. close ( ) . await ?;
210
211
211
212
Ok ( ( ) )
212
213
}
@@ -215,7 +216,8 @@ async fn it_opens_in_memory() -> anyhow::Result<()> {
215
216
async fn it_opens_temp_on_disk ( ) -> anyhow:: Result < ( ) > {
216
217
// If the filename is an empty string, then a private, temporary on-disk database will
217
218
// be created.
218
- let _ = SqliteConnection :: connect ( "" ) . await ?;
219
+ let conn = SqliteConnection :: connect ( "" ) . await ?;
220
+ conn. close ( ) . await ?;
219
221
220
222
Ok ( ( ) )
221
223
}
You can’t perform that action at this time.
0 commit comments