Skip to content

Commit 454694c

Browse files
committed
try fix sqllogic test cluster
1 parent 20a7410 commit 454694c

File tree

1 file changed

+7
-43
lines changed

1 file changed

+7
-43
lines changed

src/query/service/src/sessions/session_mgr.rs

+7-43
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl SessionManager {
124124
let user_api = UserApiProvider::instance();
125125
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
126126
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
127-
let session = Session::try_create(id, typ, session_ctx, mysql_conn_id)?;
127+
let session = Session::try_create(id, typ.clone(), session_ctx, mysql_conn_id)?;
128128

129129
let mut sessions = self.active_sessions.write();
130130
if sessions.len() < self.max_sessions {
@@ -134,7 +134,12 @@ impl SessionManager {
134134
&config.query.cluster_id,
135135
);
136136

137-
sessions.insert(session.get_id(), session.clone());
137+
match typ {
138+
SessionType::FlightRPC => {}
139+
_ => {
140+
sessions.insert(session.get_id(), session.clone());
141+
}
142+
}
138143

139144
Ok(session)
140145
} else {
@@ -144,47 +149,6 @@ impl SessionManager {
144149
}
145150
}
146151

147-
pub async fn create_rpc_session(
148-
self: &Arc<Self>,
149-
id: String,
150-
aborted: bool,
151-
) -> Result<Arc<Session>> {
152-
// TODO: maybe deadlock?
153-
let config = self.get_conf();
154-
{
155-
let sessions = self.active_sessions.read();
156-
let v = sessions.get(&id);
157-
if v.is_some() {
158-
return Ok(v.unwrap().clone());
159-
}
160-
}
161-
162-
let tenant = config.query.tenant_id.clone();
163-
let user_api = UserApiProvider::instance();
164-
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
165-
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
166-
let session = Session::try_create(id.clone(), SessionType::FlightRPC, session_ctx, None)?;
167-
168-
let mut sessions = self.active_sessions.write();
169-
let v = sessions.get(&id);
170-
if v.is_none() {
171-
if aborted {
172-
return Err(ErrorCode::AbortedSession("Aborting server."));
173-
}
174-
175-
label_counter(
176-
super::metrics::METRIC_SESSION_CONNECT_NUMBERS,
177-
&config.query.tenant_id,
178-
&config.query.cluster_id,
179-
);
180-
181-
sessions.insert(id, session.clone());
182-
Ok(session)
183-
} else {
184-
Ok(v.unwrap().clone())
185-
}
186-
}
187-
188152
#[allow(clippy::ptr_arg)]
189153
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
190154
let sessions = self.active_sessions.read();

0 commit comments

Comments
 (0)