Skip to content

Commit ca3e574

Browse files
committed
WIP fix(pool): reimplement pool internals with futures-intrusive
1 parent e33e451 commit ca3e574

File tree

7 files changed

+203
-233
lines changed

7 files changed

+203
-233
lines changed

Cargo.lock

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlx-core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ encoding_rs = { version = "0.8.23", optional = true }
119119
either = "1.5.3"
120120
futures-channel = { version = "0.3.5", default-features = false, features = ["sink", "alloc", "std"] }
121121
futures-core = { version = "0.3.5", default-features = false }
122+
futures-intrusive = "0.4.0"
122123
futures-util = { version = "0.3.5", features = ["sink"] }
123124
generic-array = { version = "0.14.4", default-features = false, optional = true }
124125
hex = "0.4.2"

sqlx-core/src/pool/connection.rs

+29-13
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
use super::inner::{DecrementSizeGuard, SharedPool};
2-
use crate::connection::Connection;
3-
use crate::database::Database;
4-
use crate::error::Error;
5-
use sqlx_rt::spawn;
61
use std::fmt::{self, Debug, Formatter};
72
use std::ops::{Deref, DerefMut};
83
use std::sync::Arc;
94
use std::time::Instant;
105

6+
use futures_intrusive::sync::SemaphoreReleaser;
7+
8+
use crate::connection::Connection;
9+
use crate::database::Database;
10+
use crate::error::Error;
11+
12+
use super::inner::{DecrementSizeGuard, SharedPool};
13+
1114
/// A connection managed by a [`Pool`][crate::pool::Pool].
1215
///
1316
/// Will be returned to the pool on-drop.
@@ -28,8 +31,8 @@ pub(super) struct Idle<DB: Database> {
2831

2932
/// RAII wrapper for connections being handled by functions that may drop them
3033
pub(super) struct Floating<'p, C> {
31-
inner: C,
32-
guard: DecrementSizeGuard<'p>,
34+
pub(super) inner: C,
35+
pub(super) guard: DecrementSizeGuard<'p>,
3336
}
3437

3538
const DEREF_ERR: &str = "(bug) connection already released to pool";
@@ -71,7 +74,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
7174
fn drop(&mut self) {
7275
if let Some(live) = self.live.take() {
7376
let pool = self.pool.clone();
74-
spawn(async move {
77+
sqlx_rt::spawn(async move {
7578
let mut floating = live.float(&pool);
7679

7780
// test the connection on-release to ensure it is still viable
@@ -102,7 +105,8 @@ impl<DB: Database> Live<DB> {
102105
pub fn float(self, pool: &SharedPool<DB>) -> Floating<'_, Self> {
103106
Floating {
104107
inner: self,
105-
guard: DecrementSizeGuard::new(pool),
108+
// create a new guard from a previously leaked permit
109+
guard: DecrementSizeGuard::new_permit(pool),
106110
}
107111
}
108112

@@ -161,6 +165,11 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
161165
}
162166
}
163167

168+
pub async fn close(self) -> Result<(), Error> {
169+
// `guard` is dropped as intended
170+
self.inner.raw.close().await
171+
}
172+
164173
pub fn detach(self) -> DB::Connection {
165174
self.inner.raw
166175
}
@@ -174,10 +183,14 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
174183
}
175184

176185
impl<'s, DB: Database> Floating<'s, Idle<DB>> {
177-
pub fn from_idle(idle: Idle<DB>, pool: &'s SharedPool<DB>) -> Self {
186+
pub fn from_idle(
187+
idle: Idle<DB>,
188+
pool: &'s SharedPool<DB>,
189+
permit: SemaphoreReleaser<'s>,
190+
) -> Self {
178191
Self {
179192
inner: idle,
180-
guard: DecrementSizeGuard::new(pool),
193+
guard: DecrementSizeGuard::from_permit(pool, permit),
181194
}
182195
}
183196

@@ -192,9 +205,12 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
192205
}
193206
}
194207

195-
pub async fn close(self) -> Result<(), Error> {
208+
pub async fn close(self) -> DecrementSizeGuard<'s> {
196209
// `guard` is dropped as intended
197-
self.inner.live.raw.close().await
210+
if let Err(e) = self.inner.live.raw.close().await {
211+
log::debug!("error occurred while closing the pool connection: {}", e);
212+
}
213+
self.guard
198214
}
199215
}
200216

0 commit comments

Comments
 (0)