Skip to content

Commit 7bcf0c5

Browse files
authored
Async Connection Layer (#50)
* [Feat] client runtime * [Feat] asynchronous connections * [Fix] review * [Fix] review * [Fix] review
1 parent f7c7a2c commit 7bcf0c5

File tree

10 files changed

+309
-164
lines changed

10 files changed

+309
-164
lines changed

Cargo.lock

+13-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ serde = "1.0"
5454
serde_bytes = "0.11"
5555
serde_derive = "1.0"
5656
serde_json = "1.0"
57+
socket2 = "0.5"
5758
spin = "0.9"
5859
tokio = { workspace = true }
5960
tokio-util = "0.7"

src/client/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ pub struct ClientConfig {
155155
pub conn_init_thread_num: usize,
156156
pub query_concurrency_limit: Option<usize>,
157157

158+
pub conn_reader_thread_num: usize,
159+
pub conn_writer_thread_num: usize,
160+
pub default_thread_num: usize,
161+
162+
pub max_inflight_reqs_per_conn: usize,
163+
158164
pub log_level_flag: u16,
159165
}
160166

@@ -199,6 +205,12 @@ impl Default for ClientConfig {
199205
conn_init_thread_num: 2,
200206
query_concurrency_limit: None,
201207

208+
conn_reader_thread_num: 4,
209+
conn_writer_thread_num: 2,
210+
default_thread_num: 2,
211+
212+
max_inflight_reqs_per_conn: 100,
213+
202214
log_level_flag: DEFAULT_FLAG,
203215
}
204216
}

src/client/table_client.rs

+68-8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ use crate::{
6363
proxy::Proxy,
6464
Builder as ConnBuilder,
6565
},
66+
runtime,
67+
runtime::RuntimeRef,
6668
serde_obkv::value::Value,
6769
util::{
6870
assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs,
@@ -193,19 +195,22 @@ struct ObTableClientInner {
193195
closed: AtomicBool,
194196
status_mutex: Lock,
195197

196-
//ServerAddr(all) -> ObTableConnection
198+
// Client Runtimes
199+
runtimes: RuntimesRef,
200+
201+
// ServerAddr(all) -> ObTableConnection
197202
table_roster: RwLock<HashMap<ObServerAddr, Arc<ObTable>>>,
198203
server_roster: ServerRoster,
199204
running_mode: RunningMode,
200-
//TableName -> TableEntry
205+
// TableName -> TableEntry
201206
table_locations: RwLock<HashMap<String, Arc<TableEntry>>>,
202207
table_mutexs: RwLock<HashMap<String, Arc<Lock>>>,
203-
//TableName -> rowKey element
208+
// TableName -> rowKey element
204209
table_row_key_element: RwLock<HashMap<String, HashMap<String, i32>>>,
205210
connection_pools: RwLock<HashMap<ObServerAddr, Arc<ConnPool>>>,
206211

207212
_retry_on_change_master: bool,
208-
//TableName -> failure counter
213+
// TableName -> failure counter
209214
table_continuous_failures: RwLock<HashMap<String, Arc<AtomicUsize>>>,
210215

211216
refresh_metadata_mutex: Lock,
@@ -231,6 +236,7 @@ impl ObTableClientInner {
231236
database: String,
232237
running_mode: RunningMode,
233238
config: ClientConfig,
239+
runtimes: Arc<ObClientRuntimes>,
234240
) -> Result<Self> {
235241
let conn_init_thread_num = config.conn_init_thread_num;
236242
let ocp_manager =
@@ -250,6 +256,7 @@ impl ObTableClientInner {
250256
datasource_name: "".to_owned(),
251257
running_mode,
252258
config: config.clone(),
259+
runtimes,
253260

254261
location: ObTableLocation::new(config),
255262
initialized: AtomicBool::new(false),
@@ -444,7 +451,7 @@ impl ObTableClientInner {
444451
result.push((part_id, table.clone()));
445452
continue;
446453
}
447-
//Table not found, try to refresh it and retry get it again.
454+
// Table not found, try to refresh it and retry get it again.
448455
warn!("ObTableClientInner::get_tables can not get ob table by address {:?} so that will sync refresh metadata.",
449456
replica_location.addr());
450457
self.sync_refresh_metadata()?;
@@ -599,7 +606,9 @@ impl ObTableClientInner {
599606
.tenant_name(&self.tenant_name)
600607
.user_name(&self.user_name)
601608
.database_name(&self.database)
602-
.password(&self.password);
609+
.password(&self.password)
610+
.runtimes(self.runtimes.clone())
611+
.sender_channel_size(self.config.max_inflight_reqs_per_conn);
603612

604613
let pool = Arc::new(
605614
ConnPoolBuilder::new()
@@ -895,15 +904,15 @@ impl ObTableClientInner {
895904
refresh: bool,
896905
blocking: bool,
897906
) -> Result<Arc<TableEntry>> {
898-
//Attempt to retrieve it from cache, avoid locking.
907+
// Attempt to retrieve it from cache, avoid locking.
899908
if let Some(table_entry) = self.get_table_entry_from_cache(table_name) {
900909
//If the refresh is false indicates that user tolerate not the latest data
901910
if !refresh || !self.need_refresh_table_entry(&table_entry) {
902911
return Ok(table_entry);
903912
}
904913
}
905914

906-
//Table entry is none or not refresh
915+
// Table entry is none or not refresh
907916
let table_mutex = {
908917
let table_mutexs = self.table_mutexs.rl();
909918
match table_mutexs.get(table_name) {
@@ -1477,9 +1486,57 @@ impl Drop for ObTableClientInner {
14771486
}
14781487
}
14791488

1489+
pub type RuntimesRef = Arc<ObClientRuntimes>;
1490+
1491+
/// OBKV Table Runtime
1492+
#[derive(Clone, Debug)]
1493+
pub struct ObClientRuntimes {
1494+
/// Runtime for connection to read data
1495+
pub reader_runtime: RuntimeRef,
1496+
/// Runtime for connection to write data
1497+
pub writer_runtime: RuntimeRef,
1498+
/// Runtime for some other tasks
1499+
pub default_runtime: RuntimeRef,
1500+
}
1501+
1502+
impl ObClientRuntimes {
1503+
pub fn test_default() -> ObClientRuntimes {
1504+
ObClientRuntimes {
1505+
reader_runtime: Arc::new(build_runtime("ob-conn-reader", 1)),
1506+
writer_runtime: Arc::new(build_runtime("ob-conn-writer", 1)),
1507+
default_runtime: Arc::new(build_runtime("ob-default", 1)),
1508+
}
1509+
}
1510+
}
1511+
1512+
fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
1513+
runtime::Builder::default()
1514+
.worker_threads(threads_num)
1515+
.thread_name(name)
1516+
.enable_all()
1517+
.build()
1518+
.expect("Failed to create runtime")
1519+
}
1520+
1521+
fn build_obkv_runtimes(config: &ClientConfig) -> ObClientRuntimes {
1522+
ObClientRuntimes {
1523+
reader_runtime: Arc::new(build_runtime(
1524+
"ob-conn-reader",
1525+
config.conn_reader_thread_num,
1526+
)),
1527+
writer_runtime: Arc::new(build_runtime(
1528+
"ob-conn-writer",
1529+
config.conn_writer_thread_num,
1530+
)),
1531+
default_runtime: Arc::new(build_runtime("ob-default", config.default_thread_num)),
1532+
}
1533+
}
1534+
14801535
/// OBKV Table client
14811536
#[derive(Clone)]
1537+
#[allow(dead_code)]
14821538
pub struct ObTableClient {
1539+
runtimes: Arc<ObClientRuntimes>,
14831540
inner: Arc<ObTableClientInner>,
14841541
refresh_thread_pool: Arc<ScheduledThreadPool>,
14851542
}
@@ -2326,8 +2383,10 @@ impl Builder {
23262383
pub fn build(self) -> Result<ObTableClient> {
23272384
assert_not_empty(&self.param_url, "Blank param url");
23282385
assert_not_empty(&self.full_user_name, "Blank full user name");
2386+
let runtimes = Arc::new(build_obkv_runtimes(&self.config));
23292387

23302388
Ok(ObTableClient {
2389+
runtimes: runtimes.clone(),
23312390
inner: Arc::new(ObTableClientInner::internal_new(
23322391
self.param_url,
23332392
self.full_user_name,
@@ -2338,6 +2397,7 @@ impl Builder {
23382397
self.database,
23392398
self.running_mode,
23402399
self.config,
2400+
runtimes,
23412401
)?),
23422402
refresh_thread_pool: Arc::new(
23432403
ScheduledThreadPool::builder()

src/rpc/conn_pool.rs

+2
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ mod test {
425425
use std::sync::atomic::Ordering;
426426

427427
use super::*;
428+
use crate::client::table_client::ObClientRuntimes;
428429

429430
fn gen_test_conn_builder() -> ConnBuilder {
430431
ConnBuilder::new()
@@ -439,6 +440,7 @@ mod test {
439440
.user_name("test")
440441
.database_name("test")
441442
.password("test")
443+
.runtimes(Arc::new(ObClientRuntimes::test_default()))
442444
}
443445

444446
fn gen_test_conn_pool(min_conn_num: usize, max_conn_num: usize) -> ConnPool {

0 commit comments

Comments
 (0)