Skip to content

Commit d06cc14

Browse files
authored
Merge pull request #7064 from ariesdevil/dev
refactor(session): Remove `SessionRef`
2 parents 5d328ba + 541a91a commit d06cc14

File tree

14 files changed

+58
-179
lines changed

14 files changed

+58
-179
lines changed

src/query/service/src/auth.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use common_users::JwtAuthenticator;
2323
use common_users::UserApiProvider;
2424
use jwtk::Claims;
2525

26-
use crate::sessions::SessionRef;
26+
use crate::sessions::Session;
2727
pub use crate::Config;
2828

2929
pub struct AuthMgr {
@@ -49,7 +49,7 @@ impl AuthMgr {
4949
}))
5050
}
5151

52-
pub async fn auth(&self, session: SessionRef, credential: &Credential) -> Result<()> {
52+
pub async fn auth(&self, session: Arc<Session>, credential: &Credential) -> Result<()> {
5353
let user_info = match credential {
5454
Credential::Jwt {
5555
token: t,
@@ -105,7 +105,7 @@ impl AuthMgr {
105105

106106
async fn process_jwt_claims(
107107
&self,
108-
session: &SessionRef,
108+
session: &Arc<Session>,
109109
claims: &Claims<CustomClaims>,
110110
) -> Result<(String, String)> {
111111
// setup tenant if the JWT claims contain extra.tenant_id

src/query/service/src/servers/http/v1/query/execute_state.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::pipelines::PipelineBuildResult;
4949
use crate::servers::utils::use_planner_v2;
5050
use crate::sessions::QueryAffect;
5151
use crate::sessions::QueryContext;
52-
use crate::sessions::SessionRef;
52+
use crate::sessions::Session;
5353
use crate::sessions::TableContext;
5454
use crate::sql::ColumnBinding;
5555
use crate::sql::DfParser;
@@ -105,7 +105,7 @@ impl ExecuteState {
105105

106106
pub struct ExecuteRunning {
107107
// used to kill query
108-
session: SessionRef,
108+
session: Arc<Session>,
109109
// mainly used to get progress for now
110110
ctx: Arc<QueryContext>,
111111
interpreter: Arc<dyn Interpreter>,
@@ -180,7 +180,7 @@ impl Executor {
180180
impl ExecuteState {
181181
pub(crate) async fn try_create(
182182
request: &HttpQueryRequest,
183-
session: SessionRef,
183+
session: Arc<Session>,
184184
ctx: Arc<QueryContext>,
185185
block_buffer: Arc<BlockBuffer>,
186186
) -> Result<Arc<RwLock<Executor>>> {

src/query/service/src/servers/http/v1/query/expirable.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
1516
use std::time::Duration;
1617
use std::time::Instant;
1718

18-
use crate::sessions::SessionRef;
19+
use crate::sessions::Session;
1920

2021
#[derive(PartialEq, Eq)]
2122
pub enum ExpiringState {
@@ -30,7 +31,7 @@ pub trait Expirable {
3031
fn on_expire(&self);
3132
}
3233

33-
impl Expirable for SessionRef {
34+
impl Expirable for Arc<Session> {
3435
fn expire_state(&self) -> ExpiringState {
3536
if self.is_aborting() {
3637
ExpiringState::Aborted {

src/query/service/src/servers/http/v1/query/http_query_context.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,38 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use poem::FromRequest;
1618
use poem::Request;
1719
use poem::RequestBody;
1820
use poem::Result as PoemResult;
1921

20-
use crate::sessions::SessionRef;
22+
use crate::sessions::Session;
23+
use crate::sessions::SessionManager;
2124
use crate::sessions::SessionType;
2225

2326
pub struct HttpQueryContext {
24-
session: SessionRef,
27+
session: Arc<Session>,
2528
}
2629

2730
impl HttpQueryContext {
28-
pub fn new(session: SessionRef) -> Self {
31+
pub fn new(session: Arc<Session>) -> Self {
2932
HttpQueryContext { session }
3033
}
3134

32-
pub fn get_session(&self, session_type: SessionType) -> SessionRef {
35+
pub fn get_session(&self, session_type: SessionType) -> Arc<Session> {
3336
self.session.set_type(session_type);
3437
self.session.clone()
3538
}
3639
}
3740

41+
impl Drop for HttpQueryContext {
42+
fn drop(&mut self) {
43+
SessionManager::instance().destroy_session(&self.session.get_id())
44+
}
45+
}
46+
3847
#[async_trait::async_trait]
3948
impl<'a> FromRequest<'a> for &'a HttpQueryContext {
4049
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {

src/query/service/src/servers/http/v1/query/http_query_manager.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use super::expiring_map::ExpiringMap;
2929
use super::HttpQueryContext;
3030
use crate::servers::http::v1::query::http_query::HttpQuery;
3131
use crate::servers::http::v1::query::HttpQueryRequest;
32-
use crate::sessions::SessionRef;
32+
use crate::sessions::Session;
3333
use crate::Config;
3434

3535
// TODO(youngsofun): may need refactor later for 2 reasons:
@@ -42,7 +42,7 @@ pub(crate) struct HttpQueryConfig {
4242

4343
pub struct HttpQueryManager {
4444
pub(crate) queries: Arc<RwLock<HashMap<String, Arc<HttpQuery>>>>,
45-
pub(crate) sessions: Mutex<ExpiringMap<String, SessionRef>>,
45+
pub(crate) sessions: Mutex<ExpiringMap<String, Arc<Session>>>,
4646
pub(crate) config: HttpQueryConfig,
4747
}
4848

@@ -117,14 +117,14 @@ impl HttpQueryManager {
117117
q
118118
}
119119

120-
pub(crate) async fn get_session(self: &Arc<Self>, session_id: &str) -> Option<SessionRef> {
120+
pub(crate) async fn get_session(self: &Arc<Self>, session_id: &str) -> Option<Arc<Session>> {
121121
let sessions = self.sessions.lock();
122122
sessions.get(session_id)
123123
}
124124

125-
pub(crate) async fn add_session(self: &Arc<Self>, session: SessionRef, timeout: Duration) {
125+
pub(crate) async fn add_session(self: &Arc<Self>, session: Arc<Session>, timeout: Duration) {
126126
let mut sessions = self.sessions.lock();
127-
sessions.insert(session.get_id(), session.clone(), Some(timeout));
127+
sessions.insert(session.get_id(), session, Some(timeout));
128128
}
129129

130130
pub(crate) fn kill_session(self: &Arc<Self>, session_id: &str) {

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ use crate::servers::mysql::MySQLFederated;
5252
use crate::servers::mysql::MYSQL_VERSION;
5353
use crate::servers::utils::use_planner_v2;
5454
use crate::sessions::QueryContext;
55-
use crate::sessions::SessionRef;
55+
use crate::sessions::Session;
56+
use crate::sessions::SessionManager;
5657
use crate::sessions::TableContext;
5758
use crate::sql::plans::Plan;
5859
use crate::sql::DfParser;
@@ -83,10 +84,16 @@ fn has_result_set_by_plan_node(plan: &PlanNode) -> bool {
8384
}
8485

8586
struct InteractiveWorkerBase<W: AsyncWrite + Send + Unpin> {
86-
session: SessionRef,
87+
session: Arc<Session>,
8788
generic_hold: PhantomData<W>,
8889
}
8990

91+
impl<W: AsyncWrite + Send + Unpin> Drop for InteractiveWorkerBase<W> {
92+
fn drop(&mut self) {
93+
SessionManager::instance().destroy_session(&self.session.get_id())
94+
}
95+
}
96+
9097
pub struct InteractiveWorker<W: AsyncWrite + Send + Unpin> {
9198
base: InteractiveWorkerBase<W>,
9299
version: String,
@@ -486,7 +493,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
486493
}
487494

488495
impl<W: AsyncWrite + Send + Unpin> InteractiveWorker<W> {
489-
pub fn create(session: SessionRef, client_addr: String) -> InteractiveWorker<W> {
496+
pub fn create(session: Arc<Session>, client_addr: String) -> InteractiveWorker<W> {
490497
let mut bs = vec![0u8; 20];
491498
let mut rng = rand::thread_rng();
492499
rng.fill_bytes(bs.as_mut());

src/query/service/src/servers/mysql/mysql_session.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::net::Shutdown;
16+
use std::sync::Arc;
1617

1718
use common_base::base::tokio::io::BufWriter;
1819
use common_base::base::tokio::net::TcpStream;
@@ -27,15 +28,15 @@ use opensrv_mysql::IntermediaryOptions;
2728
use tracing::error;
2829

2930
use crate::servers::mysql::mysql_interactive_worker::InteractiveWorker;
30-
use crate::sessions::SessionRef;
31+
use crate::sessions::Session;
3132

3233
// default size of resultset write buffer: 100KB
3334
const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024;
3435

3536
pub struct MySQLConnection;
3637

3738
impl MySQLConnection {
38-
pub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> {
39+
pub fn run_on_stream(session: Arc<Session>, stream: TcpStream) -> Result<()> {
3940
let blocking_stream = Self::convert_stream(stream)?;
4041
MySQLConnection::attach_session(&session, &blocking_stream)?;
4142

@@ -58,7 +59,7 @@ impl MySQLConnection {
5859
Ok(())
5960
}
6061

61-
fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> {
62+
fn attach_session(session: &Arc<Session>, blocking_stream: &std::net::TcpStream) -> Result<()> {
6263
let host = blocking_stream.peer_addr().ok();
6364
let blocking_stream_ref = blocking_stream.try_clone()?;
6465
session.attach(host, move || {

src/query/service/src/sessions/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ mod session_info;
2222
#[allow(clippy::module_inception)]
2323
mod session_mgr;
2424
mod session_mgr_status;
25-
mod session_ref;
2625
mod session_settings;
2726
mod session_status;
2827
mod session_type;
@@ -36,7 +35,6 @@ pub use session_ctx::SessionContext;
3635
pub use session_info::ProcessInfo;
3736
pub use session_mgr::SessionManager;
3837
pub use session_mgr_status::SessionManagerStatus;
39-
pub use session_ref::SessionRef;
4038
pub use session_settings::Settings;
4139
pub use session_status::SessionStatus;
4240
pub use session_type::SessionType;

src/query/service/src/sessions/query_ctx.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ use crate::servers::http::v1::HttpQueryHandle;
6161
use crate::sessions::query_affect::QueryAffect;
6262
use crate::sessions::ProcessInfo;
6363
use crate::sessions::QueryContextShared;
64+
use crate::sessions::Session;
6465
use crate::sessions::SessionManager;
65-
use crate::sessions::SessionRef;
6666
use crate::sessions::Settings;
6767
use crate::sessions::TableContext;
6868
use crate::storages::stage::StageTable;
@@ -190,12 +190,12 @@ impl QueryContext {
190190
}
191191

192192
// Get the current session.
193-
pub fn get_current_session(self: &Arc<Self>) -> SessionRef {
194-
SessionRef::create(self.shared.session.clone())
193+
pub fn get_current_session(self: &Arc<Self>) -> Arc<Session> {
194+
self.shared.session.clone()
195195
}
196196

197197
// Get one session by session id.
198-
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
198+
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
199199
SessionManager::instance().get_session_by_id(id).await
200200
}
201201

src/query/service/src/sessions/session.rs

-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::net::SocketAddr;
16-
use std::sync::atomic::AtomicUsize;
1716
use std::sync::Arc;
1817

1918
use chrono_tz::Tz;
@@ -41,7 +40,6 @@ use crate::Config;
4140
pub struct Session {
4241
pub(in crate::sessions) id: String,
4342
pub(in crate::sessions) typ: RwLock<SessionType>,
44-
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
4543
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
4644
status: Arc<RwLock<SessionStatus>>,
4745
pub(in crate::sessions) mysql_connection_id: Option<u32>,
@@ -54,13 +52,11 @@ impl Session {
5452
session_ctx: Arc<SessionContext>,
5553
mysql_connection_id: Option<u32>,
5654
) -> Result<Arc<Session>> {
57-
let ref_count = Arc::new(AtomicUsize::new(0));
5855
let status = Arc::new(Default::default());
5956
Ok(Arc::new(Session {
6057
id,
6158
typ: RwLock::new(typ),
6259
status,
63-
ref_count,
6460
session_ctx,
6561
mysql_connection_id,
6662
}))

src/query/service/src/sessions/session_mgr.rs

+11-50
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use tracing::debug;
3636
use tracing::info;
3737

3838
use crate::sessions::session::Session;
39-
use crate::sessions::session_ref::SessionRef;
4039
use crate::sessions::ProcessInfo;
4140
use crate::sessions::SessionContext;
4241
use crate::sessions::SessionManagerStatus;
@@ -87,7 +86,7 @@ impl SessionManager {
8786
self.conf.clone()
8887
}
8988

90-
pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef> {
89+
pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<Arc<Session>> {
9190
// TODO: maybe deadlock
9291
let config = self.get_conf();
9392
{
@@ -125,7 +124,7 @@ impl SessionManager {
125124
let user_api = UserApiProvider::instance();
126125
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
127126
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
128-
let session = Session::try_create(id, typ, session_ctx, mysql_conn_id)?;
127+
let session = Session::try_create(id, typ.clone(), session_ctx, mysql_conn_id)?;
129128

130129
let mut sessions = self.active_sessions.write();
131130
if sessions.len() < self.max_sessions {
@@ -135,63 +134,25 @@ impl SessionManager {
135134
&config.query.cluster_id,
136135
);
137136

138-
sessions.insert(session.get_id(), session.clone());
137+
match typ {
138+
SessionType::FlightRPC => {}
139+
_ => {
140+
sessions.insert(session.get_id(), session.clone());
141+
}
142+
}
139143

140-
Ok(SessionRef::create(session))
144+
Ok(session)
141145
} else {
142146
Err(ErrorCode::TooManyUserConnections(
143147
"The current accept connection has exceeded max_active_sessions config",
144148
))
145149
}
146150
}
147151

148-
pub async fn create_rpc_session(
149-
self: &Arc<Self>,
150-
id: String,
151-
aborted: bool,
152-
) -> Result<SessionRef> {
153-
// TODO: maybe deadlock?
154-
let config = self.get_conf();
155-
{
156-
let sessions = self.active_sessions.read();
157-
let v = sessions.get(&id);
158-
if v.is_some() {
159-
return Ok(SessionRef::create(v.unwrap().clone()));
160-
}
161-
}
162-
163-
let tenant = config.query.tenant_id.clone();
164-
let user_api = UserApiProvider::instance();
165-
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
166-
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
167-
let session = Session::try_create(id.clone(), SessionType::FlightRPC, session_ctx, None)?;
168-
169-
let mut sessions = self.active_sessions.write();
170-
let v = sessions.get(&id);
171-
if v.is_none() {
172-
if aborted {
173-
return Err(ErrorCode::AbortedSession("Aborting server."));
174-
}
175-
176-
label_counter(
177-
super::metrics::METRIC_SESSION_CONNECT_NUMBERS,
178-
&config.query.tenant_id,
179-
&config.query.cluster_id,
180-
);
181-
182-
sessions.insert(id, session.clone());
183-
Ok(SessionRef::create(session))
184-
} else {
185-
Ok(SessionRef::create(v.unwrap().clone()))
186-
}
187-
}
188-
189152
#[allow(clippy::ptr_arg)]
190-
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
153+
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
191154
let sessions = self.active_sessions.read();
192-
sessions
193-
.get(id)
194-
.map(|session| SessionRef::create(session.clone()))
155+
sessions.get(id).cloned()
195156
}
196157

197158
#[allow(clippy::ptr_arg)]

0 commit comments

Comments
 (0)