-
Notifications
You must be signed in to change notification settings - Fork 19
Async Connection Layer #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -155,6 +155,12 @@ pub struct ClientConfig { | |||||
pub conn_init_thread_num: usize, | ||||||
pub query_concurrency_limit: Option<usize>, | ||||||
|
||||||
pub conn_reader_thread_num: usize, | ||||||
pub conn_writer_thread_num: usize, | ||||||
pub default_thread_num: usize, | ||||||
|
||||||
pub conn_sender_mpsc_channel: usize, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we need a more readable name for this option:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, using max_inflight_reqs_per_conn instead. |
||||||
|
||||||
pub log_level_flag: u16, | ||||||
} | ||||||
|
||||||
|
@@ -199,6 +205,12 @@ impl Default for ClientConfig { | |||||
conn_init_thread_num: 2, | ||||||
query_concurrency_limit: None, | ||||||
|
||||||
conn_reader_thread_num: 4, | ||||||
conn_writer_thread_num: 2, | ||||||
default_thread_num: 2, | ||||||
|
||||||
conn_sender_mpsc_channel: 100, | ||||||
|
||||||
log_level_flag: DEFAULT_FLAG, | ||||||
} | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,8 @@ use crate::{ | |
proxy::Proxy, | ||
Builder as ConnBuilder, | ||
}, | ||
runtime, | ||
runtime::RuntimeRef, | ||
serde_obkv::value::Value, | ||
util::{ | ||
assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs, | ||
|
@@ -193,19 +195,22 @@ struct ObTableClientInner { | |
closed: AtomicBool, | ||
status_mutex: Lock, | ||
|
||
//ServerAddr(all) -> ObTableConnection | ||
// Client Runtimes | ||
runtimes: RuntimesRef, | ||
|
||
// ServerAddr(all) -> ObTableConnection | ||
table_roster: RwLock<HashMap<ObServerAddr, Arc<ObTable>>>, | ||
server_roster: ServerRoster, | ||
running_mode: RunningMode, | ||
//TableName -> TableEntry | ||
// TableName -> TableEntry | ||
table_locations: RwLock<HashMap<String, Arc<TableEntry>>>, | ||
table_mutexs: RwLock<HashMap<String, Arc<Lock>>>, | ||
//TableName -> rowKey element | ||
// TableName -> rowKey element | ||
table_row_key_element: RwLock<HashMap<String, HashMap<String, i32>>>, | ||
connection_pools: RwLock<HashMap<ObServerAddr, Arc<ConnPool>>>, | ||
|
||
_retry_on_change_master: bool, | ||
//TableName -> failure counter | ||
// TableName -> failure counter | ||
table_continuous_failures: RwLock<HashMap<String, Arc<AtomicUsize>>>, | ||
|
||
refresh_metadata_mutex: Lock, | ||
|
@@ -231,6 +236,7 @@ impl ObTableClientInner { | |
database: String, | ||
running_mode: RunningMode, | ||
config: ClientConfig, | ||
runtimes: Arc<ObClientRuntimes>, | ||
) -> Result<Self> { | ||
let conn_init_thread_num = config.conn_init_thread_num; | ||
let ocp_manager = | ||
|
@@ -250,6 +256,7 @@ impl ObTableClientInner { | |
datasource_name: "".to_owned(), | ||
running_mode, | ||
config: config.clone(), | ||
runtimes, | ||
|
||
location: ObTableLocation::new(config), | ||
initialized: AtomicBool::new(false), | ||
|
@@ -444,7 +451,7 @@ impl ObTableClientInner { | |
result.push((part_id, table.clone())); | ||
continue; | ||
} | ||
//Table not found, try to refresh it and retry get it again. | ||
// Table not found, try to refresh it and retry get it again. | ||
warn!("ObTableClientInner::get_tables can not get ob table by address {:?} so that will sync refresh metadata.", | ||
replica_location.addr()); | ||
self.sync_refresh_metadata()?; | ||
|
@@ -599,7 +606,9 @@ impl ObTableClientInner { | |
.tenant_name(&self.tenant_name) | ||
.user_name(&self.user_name) | ||
.database_name(&self.database) | ||
.password(&self.password); | ||
.password(&self.password) | ||
.runtimes(self.runtimes.clone()) | ||
.sender_channel_size(self.config.conn_sender_mpsc_channel); | ||
|
||
let pool = Arc::new( | ||
ConnPoolBuilder::new() | ||
|
@@ -895,15 +904,15 @@ impl ObTableClientInner { | |
refresh: bool, | ||
blocking: bool, | ||
) -> Result<Arc<TableEntry>> { | ||
//Attempt to retrieve it from cache, avoid locking. | ||
// Attempt to retrieve it from cache, avoid locking. | ||
if let Some(table_entry) = self.get_table_entry_from_cache(table_name) { | ||
//If the refresh is false indicates that user tolerate not the latest data | ||
if !refresh || !self.need_refresh_table_entry(&table_entry) { | ||
return Ok(table_entry); | ||
} | ||
} | ||
|
||
//Table entry is none or not refresh | ||
// Table entry is none or not refresh | ||
let table_mutex = { | ||
let table_mutexs = self.table_mutexs.rl(); | ||
match table_mutexs.get(table_name) { | ||
|
@@ -1477,9 +1486,57 @@ impl Drop for ObTableClientInner { | |
} | ||
} | ||
|
||
pub type RuntimesRef = Arc<ObClientRuntimes>; | ||
|
||
/// OBKV Table Runtime | ||
#[derive(Clone, Debug)] | ||
pub struct ObClientRuntimes { | ||
/// Runtime for connection to read data | ||
pub reader_runtime: RuntimeRef, | ||
/// Runtime for connection to write data | ||
pub writer_runtime: RuntimeRef, | ||
/// Runtime for some other tasks | ||
pub default_runtime: RuntimeRef, | ||
} | ||
|
||
impl ObClientRuntimes { | ||
pub fn test_default() -> ObClientRuntimes { | ||
ObClientRuntimes { | ||
reader_runtime: Arc::new(build_runtime("ob-conn-reader", 1)), | ||
writer_runtime: Arc::new(build_runtime("ob-conn-writer", 1)), | ||
default_runtime: Arc::new(build_runtime("ob-default", 1)), | ||
} | ||
} | ||
} | ||
|
||
fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { | ||
runtime::Builder::default() | ||
.worker_threads(threads_num) | ||
.thread_name(name) | ||
.enable_all() | ||
.build() | ||
.expect("Failed to create runtime") | ||
} | ||
|
||
fn build_obkv_runtimes(config: &ClientConfig) -> ObClientRuntimes { | ||
ObClientRuntimes { | ||
reader_runtime: Arc::new(build_runtime( | ||
"ob-conn-reader", | ||
config.conn_reader_thread_num, | ||
)), | ||
writer_runtime: Arc::new(build_runtime( | ||
"ob-conn-writer", | ||
config.conn_writer_thread_num, | ||
)), | ||
default_runtime: Arc::new(build_runtime("ob-default", config.default_thread_num)), | ||
} | ||
} | ||
|
||
/// OBKV Table client | ||
#[derive(Clone)] | ||
#[allow(dead_code)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why dead code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dead_code is for runtimes. Runtimes that we add in this commit are for async refresh_thread_pool, which have not been implemented yet. |
||
pub struct ObTableClient { | ||
runtimes: Arc<ObClientRuntimes>, | ||
inner: Arc<ObTableClientInner>, | ||
refresh_thread_pool: Arc<ScheduledThreadPool>, | ||
} | ||
|
@@ -2326,8 +2383,10 @@ impl Builder { | |
pub fn build(self) -> Result<ObTableClient> { | ||
assert_not_empty(&self.param_url, "Blank param url"); | ||
assert_not_empty(&self.full_user_name, "Blank full user name"); | ||
let runtimes = Arc::new(build_obkv_runtimes(&self.config)); | ||
|
||
Ok(ObTableClient { | ||
runtimes: runtimes.clone(), | ||
inner: Arc::new(ObTableClientInner::internal_new( | ||
self.param_url, | ||
self.full_user_name, | ||
|
@@ -2338,6 +2397,7 @@ impl Builder { | |
self.database, | ||
self.running_mode, | ||
self.config, | ||
runtimes, | ||
)?), | ||
refresh_thread_pool: Arc::new( | ||
ScheduledThreadPool::builder() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that what the
tokio::net
module uses is justsocket2
, so I guess there is no need to import it explicitly?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set some parameters like "tcp_keepalive", and "buffer_size"..., it turns out that we have to use socket2 to define a socket that owns those properties to create a Tokio TCP stream. Socket2 is required explicitly, in order to customize a TCP stream.
Learn more from rust-lang/rust#69774 and rust-lang