From f41a236855b2884710c9d431eaff7fb5b55848ea Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Fri, 19 May 2023 18:33:47 +0800 Subject: [PATCH 1/5] [Feat] client runtime --- src/client/mod.rs | 8 ++++ src/client/table_client.rs | 94 +++++++++++++++++++++++++------------- 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 3c19427..ef6c2d1 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -155,6 +155,10 @@ pub struct ClientConfig { pub conn_init_thread_num: usize, pub query_concurrency_limit: Option, + pub conn_reader_threads: usize, + pub conn_writer_threads: usize, + pub default_threads_num: usize, + pub log_level_flag: u16, } @@ -199,6 +203,10 @@ impl Default for ClientConfig { conn_init_thread_num: 2, query_concurrency_limit: None, + conn_reader_threads: 2, + conn_writer_threads: 2, + default_threads_num: 2, + log_level_flag: DEFAULT_FLAG, } } diff --git a/src/client/table_client.rs b/src/client/table_client.rs index 48fde47..3415f50 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -38,39 +38,32 @@ use super::{ table::{self, ObTable}, ClientConfig, Table, TableOpResult, }; -use crate::{ - error::{self, CommonErrCode, Error::Common as CommonErr, Result}, - location::{ - ob_part_constants::generate_phy_part_id, ObPartitionLevel, ObServerAddr, ObTableLocation, - ReplicaLocation, TableEntry, TableEntryKey, - }, - monitors::{ - client_metrics::{ClientMetrics, ObClientOpRecordType, ObClientOpRetryType}, - prometheus::OBKV_CLIENT_REGISTRY, - }, - rpc::{ - conn_pool::{Builder as ConnPoolBuilder, ConnPool}, - protocol::{ - payloads::{ - ObTableBatchOperation, ObTableEntityType, ObTableOperationRequest, - ObTableOperationResult, ObTableOperationType, - }, - query::{ - ObHTableFilter, ObNewRange, ObScanOrder, ObTableQuery, ObTableQueryRequest, - ObTableQueryResult, ObTableStreamRequest, - }, +use crate::{error::{self, CommonErrCode, Error::Common as CommonErr, Result}, location::{ + ob_part_constants::generate_phy_part_id, ObPartitionLevel, ObServerAddr, ObTableLocation, + ReplicaLocation, TableEntry, TableEntryKey, +}, monitors::{ + client_metrics::{ClientMetrics, ObClientOpRecordType, ObClientOpRetryType}, + prometheus::OBKV_CLIENT_REGISTRY, +}, rpc::{ + conn_pool::{Builder as ConnPoolBuilder, ConnPool}, + protocol::{ + payloads::{ + ObTableBatchOperation, ObTableEntityType, ObTableOperationRequest, + ObTableOperationResult, ObTableOperationType, + }, + query::{ + ObHTableFilter, ObNewRange, ObScanOrder, ObTableQuery, ObTableQueryRequest, + ObTableQueryResult, ObTableStreamRequest, }, - proxy::Proxy, - Builder as ConnBuilder, - }, - serde_obkv::value::Value, - util::{ - assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs, - permit::{PermitGuard, Permits}, - HandyRwLock, }, - ResultCodes, -}; + proxy::Proxy, + Builder as ConnBuilder, +}, serde_obkv::value::Value, util::{ + assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs, + permit::{PermitGuard, Permits}, + HandyRwLock, +}, ResultCodes, runtime}; +use crate::runtime::RuntimeRef; lazy_static! { pub static ref OBKV_CLIENT_METRICS: ClientMetrics = { @@ -175,6 +168,7 @@ pub enum RunningMode { type Lock = Mutex; // ObTableClient inner implemetation. +#[allow(dead_code)] struct ObTableClientInner { location: ObTableLocation, ocp_manager: ObOcpModelManager, @@ -193,6 +187,9 @@ struct ObTableClientInner { closed: AtomicBool, status_mutex: Lock, + /// Client Runtimes + runtimes: Arc, + //ServerAddr(all) -> ObTableConnection table_roster: RwLock>>, server_roster: ServerRoster, @@ -231,6 +228,7 @@ impl ObTableClientInner { database: String, running_mode: RunningMode, config: ClientConfig, + runtimes: Arc, ) -> Result { let conn_init_thread_num = config.conn_init_thread_num; let ocp_manager = @@ -250,6 +248,7 @@ impl ObTableClientInner { datasource_name: "".to_owned(), running_mode, config: config.clone(), + runtimes: runtimes.clone(), location: ObTableLocation::new(config), initialized: AtomicBool::new(false), @@ -1477,9 +1476,39 @@ impl Drop for ObTableClientInner { } } +/// OBKV Table Runtime +#[derive(Clone, Debug)] +pub struct ObClientRuntimes { + /// Runtime for connection to read data + pub reader_runtime: RuntimeRef, + /// Runtime for connection to write data + pub writer_runtime: RuntimeRef, + /// Runtime for some other tasks + pub default_runtime: RuntimeRef, +} + +fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { + runtime::Builder::default() + .worker_threads(threads_num) + .thread_name(name) + .enable_all() + .build() + .expect("Failed to create runtime") +} + +fn build_obkv_runtimes(config: &ClientConfig) -> ObClientRuntimes { + ObClientRuntimes { + reader_runtime: Arc::new(build_runtime("ob-conn-read", config.conn_reader_threads)), + writer_runtime: Arc::new(build_runtime("ob-conn-write", config.conn_writer_threads)), + default_runtime: Arc::new(build_runtime("ceres-default", config.default_threads_num)), + } +} + /// OBKV Table client #[derive(Clone)] +#[allow(dead_code)] pub struct ObTableClient { + runtimes: Arc, inner: Arc, refresh_thread_pool: Arc, } @@ -2326,8 +2355,10 @@ impl Builder { pub fn build(self) -> Result { assert_not_empty(&self.param_url, "Blank param url"); assert_not_empty(&self.full_user_name, "Blank full user name"); + let runtimes = Arc::new(build_obkv_runtimes(&self.config)); Ok(ObTableClient { + runtimes: runtimes.clone(), inner: Arc::new(ObTableClientInner::internal_new( self.param_url, self.full_user_name, @@ -2338,6 +2369,7 @@ impl Builder { self.database, self.running_mode, self.config, + runtimes.clone(), )?), refresh_thread_pool: Arc::new( ScheduledThreadPool::builder() From 78e5b543fbe4435b3f5404bc753dc12b36f92728 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Wed, 24 May 2023 17:01:48 +0800 Subject: [PATCH 2/5] [Feat] asynchronous connections --- Cargo.lock | 15 +- Cargo.toml | 1 + src/client/mod.rs | 4 +- src/client/table_client.rs | 80 ++++--- src/rpc/conn_pool.rs | 2 + src/rpc/mod.rs | 301 ++++++++++++++------------- tests/test_table_client_base.rs | 2 +- ycsb-rs/src/obkv_client.rs | 8 + ycsb-rs/src/properties.rs | 18 ++ ycsb-rs/workloads/workload_obkv.toml | 9 +- 10 files changed, 261 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b107b25..c92ccbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -835,7 +835,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -1243,6 +1243,7 @@ dependencies = [ "serde_json", "serial_test", "serial_test_derive", + "socket2 0.5.3", "spin 0.9.8", "tempfile", "test-log", @@ -1984,6 +1985,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -2221,7 +2232,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.9", "tokio-macros", "windows-sys 0.48.0", ] diff --git a/Cargo.toml b/Cargo.toml index 955feac..165260d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ serde = "1.0" serde_bytes = "0.11" serde_derive = "1.0" serde_json = "1.0" +socket2 = "0.5" spin = "0.9" tokio = { workspace = true } tokio-util = "0.7" diff --git a/src/client/mod.rs b/src/client/mod.rs index ef6c2d1..3bf3e67 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -203,8 +203,8 @@ impl Default for ClientConfig { conn_init_thread_num: 2, query_concurrency_limit: None, - conn_reader_threads: 2, - conn_writer_threads: 2, + conn_reader_threads: 6, + conn_writer_threads: 4, default_threads_num: 2, log_level_flag: DEFAULT_FLAG, diff --git a/src/client/table_client.rs b/src/client/table_client.rs index 3415f50..e63f0ca 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -38,32 +38,41 @@ use super::{ table::{self, ObTable}, ClientConfig, Table, TableOpResult, }; -use crate::{error::{self, CommonErrCode, Error::Common as CommonErr, Result}, location::{ - ob_part_constants::generate_phy_part_id, ObPartitionLevel, ObServerAddr, ObTableLocation, - ReplicaLocation, TableEntry, TableEntryKey, -}, monitors::{ - client_metrics::{ClientMetrics, ObClientOpRecordType, ObClientOpRetryType}, - prometheus::OBKV_CLIENT_REGISTRY, -}, rpc::{ - conn_pool::{Builder as ConnPoolBuilder, ConnPool}, - protocol::{ - payloads::{ - ObTableBatchOperation, ObTableEntityType, ObTableOperationRequest, - ObTableOperationResult, ObTableOperationType, - }, - query::{ - ObHTableFilter, ObNewRange, ObScanOrder, ObTableQuery, ObTableQueryRequest, - ObTableQueryResult, ObTableStreamRequest, +use crate::{ + error::{self, CommonErrCode, Error::Common as CommonErr, Result}, + location::{ + ob_part_constants::generate_phy_part_id, ObPartitionLevel, ObServerAddr, ObTableLocation, + ReplicaLocation, TableEntry, TableEntryKey, + }, + monitors::{ + client_metrics::{ClientMetrics, ObClientOpRecordType, ObClientOpRetryType}, + prometheus::OBKV_CLIENT_REGISTRY, + }, + rpc::{ + conn_pool::{Builder as ConnPoolBuilder, ConnPool}, + protocol::{ + payloads::{ + ObTableBatchOperation, ObTableEntityType, ObTableOperationRequest, + ObTableOperationResult, ObTableOperationType, + }, + query::{ + ObHTableFilter, ObNewRange, ObScanOrder, ObTableQuery, ObTableQueryRequest, + ObTableQueryResult, ObTableStreamRequest, + }, }, + proxy::Proxy, + Builder as ConnBuilder, + }, + runtime, + runtime::RuntimeRef, + serde_obkv::value::Value, + util::{ + assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs, + permit::{PermitGuard, Permits}, + HandyRwLock, }, - proxy::Proxy, - Builder as ConnBuilder, -}, serde_obkv::value::Value, util::{ - assert_not_empty, current_time_millis, duration_to_millis, millis_to_secs, - permit::{PermitGuard, Permits}, - HandyRwLock, -}, ResultCodes, runtime}; -use crate::runtime::RuntimeRef; + ResultCodes, +}; lazy_static! { pub static ref OBKV_CLIENT_METRICS: ClientMetrics = { @@ -188,7 +197,7 @@ struct ObTableClientInner { status_mutex: Lock, /// Client Runtimes - runtimes: Arc, + runtimes: RuntimesRef, //ServerAddr(all) -> ObTableConnection table_roster: RwLock>>, @@ -248,7 +257,7 @@ impl ObTableClientInner { datasource_name: "".to_owned(), running_mode, config: config.clone(), - runtimes: runtimes.clone(), + runtimes, location: ObTableLocation::new(config), initialized: AtomicBool::new(false), @@ -598,7 +607,8 @@ impl ObTableClientInner { .tenant_name(&self.tenant_name) .user_name(&self.user_name) .database_name(&self.database) - .password(&self.password); + .password(&self.password) + .runtimes(self.runtimes.clone()); let pool = Arc::new( ConnPoolBuilder::new() @@ -1476,6 +1486,8 @@ impl Drop for ObTableClientInner { } } +pub type RuntimesRef = Arc; + /// OBKV Table Runtime #[derive(Clone, Debug)] pub struct ObClientRuntimes { @@ -1487,6 +1499,16 @@ pub struct ObClientRuntimes { pub default_runtime: RuntimeRef, } +impl ObClientRuntimes { + pub fn test_default() -> ObClientRuntimes { + ObClientRuntimes { + reader_runtime: Arc::new(build_runtime("ob-conn-read", 1)), + writer_runtime: Arc::new(build_runtime("ob-conn-write", 1)), + default_runtime: Arc::new(build_runtime("ob-default", 1)), + } + } +} + fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { runtime::Builder::default() .worker_threads(threads_num) @@ -1500,7 +1522,7 @@ fn build_obkv_runtimes(config: &ClientConfig) -> ObClientRuntimes { ObClientRuntimes { reader_runtime: Arc::new(build_runtime("ob-conn-read", config.conn_reader_threads)), writer_runtime: Arc::new(build_runtime("ob-conn-write", config.conn_writer_threads)), - default_runtime: Arc::new(build_runtime("ceres-default", config.default_threads_num)), + default_runtime: Arc::new(build_runtime("ob-default", config.default_threads_num)), } } @@ -2369,7 +2391,7 @@ impl Builder { self.database, self.running_mode, self.config, - runtimes.clone(), + runtimes, )?), refresh_thread_pool: Arc::new( ScheduledThreadPool::builder() diff --git a/src/rpc/conn_pool.rs b/src/rpc/conn_pool.rs index 1ef13aa..7b3485c 100644 --- a/src/rpc/conn_pool.rs +++ b/src/rpc/conn_pool.rs @@ -425,6 +425,7 @@ mod test { use std::sync::atomic::Ordering; use super::*; + use crate::client::table_client::ObClientRuntimes; fn gen_test_conn_builder() -> ConnBuilder { ConnBuilder::new() @@ -439,6 +440,7 @@ mod test { .user_name("test") .database_name("test") .password("test") + .runtimes(Arc::new(ObClientRuntimes::test_default())) } fn gen_test_conn_pool(min_conn_num: usize, max_conn_num: usize) -> ConnPool { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e7263c7..261b4db 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -22,22 +22,28 @@ pub mod util; use std::{ collections::HashMap, - io::{ErrorKind, Read, Write}, mem, - net::{Shutdown, SocketAddr, TcpStream, ToSocketAddrs}, + net::{SocketAddr, ToSocketAddrs}, ops::Drop, sync::{ atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, Arc, Mutex, }, - thread::{self, JoinHandle}, time::{Duration, Instant}, }; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; -use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; -use net2::{TcpBuilder, TcpStreamExt}; +use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpSocket, TcpStream, + }, + sync::{mpsc, oneshot}, + time::{timeout as TokioTimeout, Duration as TokioDuration}, +}; use tokio_util::codec::{Decoder, Encoder}; use uuid::Uuid; @@ -47,9 +53,11 @@ use self::protocol::{ ProtoEncoder, TransportCode, HEADER_SIZE, }; use crate::{ + client::table_client::RuntimesRef, error::{CommonErrCode, Error, Error::Common as CommonErr, Result}, monitors::{prometheus::OBKV_CLIENT_REGISTRY, rpc_metrics::RpcMetrics}, rpc::{protocol::TraceId, util::checksum::ob_crc64::ObCrc64Sse42}, + runtime::{AbortOnDropMany, JoinHandle, RuntimeRef}, }; lazy_static! { @@ -60,37 +68,38 @@ lazy_static! { }; } -type RequestsMap = Arc>>>>; +type RequestsMap = Arc>>>>; const CONN_CONTINUOUS_TIMEOUT_CEILING: usize = 10; ///Send component of OBKV connection. #[derive(Debug)] pub struct ConnectionSender { - sender: Sender, + sender: mpsc::Sender, writer: Option>>, + default_runtime: RuntimeRef, // TODO: remove this } impl ConnectionSender { fn new( - write_stream: TcpStream, + write_stream: OwnedWriteHalf, requests: RequestsMap, active: Arc, + sender_runtime: RuntimeRef, + default_runtime: RuntimeRef, ) -> ConnectionSender { - let (sender, receiver): (Sender, Receiver) = unbounded(); + let (sender, mut receiver): (mpsc::Sender, mpsc::Receiver) = + mpsc::channel(100); let mut codec = ObTablePacketCodec::new(); - let writer = thread::Builder::new() - .name("conn_writer".to_owned()) - .spawn(move || { + let writer = sender_runtime.spawn(async move { let mut buf = BytesMut::with_capacity(1024); let mut write_stream = write_stream; let addr = write_stream.peer_addr()?; loop { - OBKV_RPC_METRICS.observe_rpc_misc("request_queue_size", receiver.len() as f64); let start = Instant::now(); - match receiver.recv() { - Ok(packet) => { + match receiver.recv().await { + Some(packet) => { OBKV_RPC_METRICS.observe_rpc_duration( "reveiver_recv_time", start.elapsed(), @@ -105,7 +114,7 @@ impl ConnectionSender { Ok(()) => { OBKV_RPC_METRICS.observe_rpc_misc("write_bytes", buf.len() as f64); let start = Instant::now(); - match write_stream.write_all(&buf) { + match write_stream.write_all(&buf).await { Ok(()) => { OBKV_RPC_METRICS.observe_rpc_duration( "socket_write", @@ -114,9 +123,9 @@ impl ConnectionSender { } Err(e) => { error!( - "Fail to write packet into stream connected to {}, err: {}", - addr, e - ); + "Fail to write packet into stream connected to {}, err: {}", + addr, e + ); break; } } @@ -138,8 +147,8 @@ impl ConnectionSender { }, } } - Err(e) => { - error!("Error in connection sender Receiver::recv, {}", e); + None => { + error!("Sender channel has been closed"); break; } } @@ -147,7 +156,7 @@ impl ConnectionSender { active.store(false, Ordering::Release); - if let Err(err) = write_stream.shutdown(Shutdown::Write) { + if let Err(err) = write_stream.shutdown().await { error!("Fail to close write stream to {}, err:{}", addr, err); } @@ -156,11 +165,12 @@ impl ConnectionSender { info!("Close write stream for connection to {}", addr); Ok(()) - }).expect("Fail to create connection_writer thread"); + }); ConnectionSender { sender, writer: Some(writer), + default_runtime, } } @@ -169,27 +179,16 @@ impl ConnectionSender { ///It can fail only when connection gets closed. ///Which means OBKV connection is no longer valid. pub fn request(&self, message: ObTablePacket) -> Result<()> { - self.sender.send(message).map_err(Self::broken_pipe) + // TODO: remove block_on with sender.send().await + self.default_runtime + .block_on(async move { self.sender.send(message).await.map_err(Self::broken_pipe) }) } fn close(&mut self) -> Result<()> { self.request(ObTablePacket::ClosePoison)?; let writer = mem::replace(&mut self.writer, None); - - match writer.unwrap().join() { - Ok(_) => (), - Err(e) => { - error!( - "ConnectionSender::close fail to join on writer, err={:?}", - e - ); - return Err(CommonErr( - CommonErrCode::Rpc, - "ConnectionSender::close fail to join on writer.".to_owned(), - )); - } - } - + let drop_helper = AbortOnDropMany(vec![writer.unwrap()]); + drop(drop_helper); Ok(()) } @@ -207,7 +206,7 @@ pub struct Connection { //remote addr addr: SocketAddr, reader: Option>>, - reader_signal_sender: Sender<()>, + reader_signal_sender: mpsc::Sender<()>, sender: ConnectionSender, requests: RequestsMap, continuous_timeout_failures: AtomicUsize, @@ -218,11 +217,12 @@ pub struct Connection { id: u32, trace_id_counter: AtomicU32, load: AtomicUsize, + // TODO: check unused runtime + runtimes: RuntimesRef, } const OB_MYSQL_MAX_PACKET_LENGTH: usize = 1 << 24; const READ_BUF_SIZE: usize = 1 << 16; -const DEFAULT_STACK_SIZE: usize = 2 * 1024 * 1024; struct LoadCounter<'a>(&'a AtomicUsize); @@ -240,40 +240,44 @@ impl<'a> Drop for LoadCounter<'a> { } impl Connection { - fn internal_new(id: u32, addr: SocketAddr, stream: TcpStream) -> Result { + fn internal_new( + id: u32, + addr: SocketAddr, + stream: TcpStream, + runtimes: RuntimesRef, + ) -> Result { let requests: RequestsMap = Arc::new(Mutex::new(HashMap::new())); let read_requests = requests.clone(); - let read_stream = stream.try_clone()?; + let (read_stream, write_stream) = stream.into_split(); let active = Arc::new(AtomicBool::new(false)); let read_active = active.clone(); - let (sender, receiver): (Sender<()>, Receiver<()>) = unbounded(); - - let join_handle = thread::Builder::new() - .name("conn_reader".to_owned()) - .stack_size(OB_MYSQL_MAX_PACKET_LENGTH + DEFAULT_STACK_SIZE) - .spawn(move || { - let addr = read_stream.peer_addr()?; - - Connection::process_reading_data( - receiver, - read_stream, - read_requests.clone(), - &addr, - ); + let (sender, receiver): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel(1); - read_active.store(false, Ordering::Release); - Connection::cancel_requests(&read_requests); + let join_handle = runtimes.reader_runtime.spawn(async move { + let addr = read_stream.peer_addr()?; - info!("Close read stream for connection to {}", addr); - Ok(()) - })?; + Connection::process_reading_data(receiver, read_stream, read_requests.clone(), &addr) + .await; + + read_active.store(false, Ordering::Release); + Connection::cancel_requests(&read_requests); + + error!("Close read stream for connection to {}", addr); + Ok(()) + }); Ok(Connection { addr, reader: Some(join_handle), - sender: ConnectionSender::new(stream, requests.clone(), active.clone()), + sender: ConnectionSender::new( + write_stream, + requests.clone(), + active.clone(), + runtimes.writer_runtime.clone(), + runtimes.default_runtime.clone(), + ), requests, continuous_timeout_failures: AtomicUsize::new(0), continuous_timeout_failures_ceiling: CONN_CONTINUOUS_TIMEOUT_CEILING, @@ -284,6 +288,7 @@ impl Connection { id, trace_id_counter: AtomicU32::new(0), load: AtomicUsize::new(0), + runtimes: runtimes.clone(), }) } @@ -291,9 +296,9 @@ impl Connection { self.load.load(Ordering::Relaxed) } - fn process_reading_data( - signal_receiver: Receiver<()>, - mut read_stream: TcpStream, + async fn process_reading_data( + mut signal_receiver: mpsc::Receiver<()>, + mut read_stream: OwnedReadHalf, read_requests: RequestsMap, addr: &SocketAddr, ) { @@ -302,12 +307,13 @@ impl Connection { let mut buf = BytesMut::with_capacity(READ_BUF_SIZE); loop { if let Ok(()) = signal_receiver.try_recv() { + error!("DEBUG: out0"); break; } let start = Instant::now(); - match read_stream.read(&mut read_buf) { + match read_stream.read(&mut read_buf).await { Ok(size) => { OBKV_RPC_METRICS.observe_rpc_duration("socket_read", start.elapsed()); OBKV_RPC_METRICS.observe_rpc_misc("read_bytes", size as f64); @@ -330,7 +336,8 @@ impl Connection { if !Self::decode_packets(&mut codec, &mut buf, &read_requests, addr) { break; } - OBKV_RPC_METRICS.observe_rpc_duration("decode_responses", start.elapsed()); + OBKV_RPC_METRICS + .observe_rpc_duration("decode_responses_time", start.elapsed()); } else { info!( "Connection::process_reading_data read zero bytes, \ @@ -340,12 +347,6 @@ impl Connection { break; } } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - let start = Instant::now(); - thread::yield_now(); - OBKV_RPC_METRICS.observe_rpc_duration("yield_thread", start.elapsed()); - continue; - } Err(e) => { error!( "Connection::process_reading_data encountered IO error: {} for addr {}.", @@ -355,28 +356,23 @@ impl Connection { } } } - if let Err(err) = read_stream.shutdown(Shutdown::Read) { - warn!( - "Connection::process_reading_data fail to close read stream to {}, err:{}", - addr, err - ); - } + // TODO: stream close + drop(read_stream); } fn cancel_requests(requests: &RequestsMap) { let mut requests = requests.lock().unwrap(); - for (_, sender) in requests.iter() { + for (_, sender) in requests.drain() { if let Err(e) = sender.send(Err(CommonErr( CommonErrCode::Rpc, "connection reader exits".to_owned(), ))) { error!( "Connection::cancel_requests: fail to send cancel message, err:{}", - e + e.err().unwrap().to_string() ); } } - requests.clear(); } fn decode_packets( @@ -509,7 +505,7 @@ impl Connection { let start = Instant::now(); - let timeout = Duration::from_millis(payload.timeout_millis() as u64); + let timeout = TokioDuration::from_millis(payload.timeout_millis() as u64); payload.set_tenant_id(self.tenant_id); if let Some(ref cred) = self.credential { @@ -544,42 +540,45 @@ impl Connection { let rx = self.send(req, channel_id)?; if payload.timeout_millis() == 0 { - // no-wait request,return Ok directly + // no-wait request,return Ok directly2 return Ok(()); } - let resp = match rx.recv_timeout(timeout) { - Ok(resp) => { - self.on_recv_in_time(); - resp.map_err(|e| { - error!( + // TODO: remove block_on with rx.await + let resp = self.runtimes.default_runtime.block_on(async move { + match TokioTimeout(timeout, rx).await { + Ok(resp) => { + self.on_recv_in_time(); + resp.map_err(|e| { + error!( "Connection::execute: fail to fetch rpc response, addr:{}, trace_id:{}, err:{}", self.addr, trace_id, e ); - e - })? - } - Err(err) => { - error!( - "Connection::execute: wait for rpc response timeout, addr:{}, trace_id:{}, err:{}", - self.addr, trace_id, err - ); + e + }) + } + Err(err) => { + error!( + "Connection::execute: wait for rpc response timeout, addr:{}, trace_id:{}, err:{}", + self.addr, trace_id, err + ); - self.on_recv_timeout(); - return Err(CommonErr( - CommonErrCode::Rpc, - format!("wait for rpc response timeout, err:{err}"), - )); - } - }; + self.on_recv_timeout(); + return Err(CommonErr( + CommonErrCode::Rpc, + format!("wait for rpc response timeout, err:{err}"), + )); + } + }.expect("Tokio timeout panics, may be there is no current timer set") + }); match resp { - ObTablePacket::ServerPacket { + Ok(ObTablePacket::ServerPacket { id: _id, header, mut content, code: _code, - } => { + }) => { let header = header.unwrap(); let server_trace_id = header.trace_id(); response.set_header(header); @@ -602,7 +601,7 @@ impl Connection { OBKV_RPC_METRICS.observe_rpc_duration("execute_payload", start.elapsed()); Ok(()) } - ObTablePacket::TransportPacket { error, code } => Err(CommonErr( + Ok(ObTablePacket::TransportPacket { error, code }) => Err(CommonErr( CommonErrCode::Rpc, format!("transport code: [{code:?}], error: [{error}]"), )), @@ -637,7 +636,7 @@ impl Connection { self.execute(&mut payload, &mut login_result)?; - debug!("Connection::login login result {:?}", login_result); + debug!("Connection::login, login result {:?}", login_result); self.credential = Some(login_result.take_credential()); self.tenant_id = Some(login_result.tenant_id()); @@ -698,11 +697,13 @@ impl Connection { } //2. close reader - if let Err(e) = self - .reader_signal_sender - .send(()) - .map_err(ConnectionSender::broken_pipe) - { + //TODO: remove block_on + if let Err(e) = self.runtimes.default_runtime.block_on(async { + self.reader_signal_sender + .send(()) + .await + .map_err(ConnectionSender::broken_pipe) + }) { error!( "Connection::close fail to send signal to reader, err: {}.", e @@ -710,16 +711,8 @@ impl Connection { } let reader = mem::replace(&mut self.reader, None); - match reader.unwrap().join() { - Ok(_) => (), - Err(e) => { - error!("Connection::close fail to join on reader, err={:?}", e); - return Err(CommonErr( - CommonErrCode::Rpc, - "Connection::close fail to join on reader.".to_owned(), - )); - } - } + + drop(reader); Ok(()) } @@ -733,8 +726,8 @@ impl Connection { &self, message: ObTablePacket, channel_id: i32, - ) -> Result>> { - let (tx, rx) = bounded(1); + ) -> Result>> { + let (tx, rx) = oneshot::channel(); self.requests.lock().unwrap().insert(channel_id, tx); self.sender.request(message).map_err(|e| { error!("Connection::send: fail to send message, err:{}", e); @@ -787,6 +780,8 @@ pub struct Builder { user_name: String, database_name: String, password: String, + + runtimes: Option, } const SOCKET_KEEP_ALIVE_SECS: u64 = 15 * 60; @@ -804,6 +799,7 @@ impl Builder { user_name: "".to_owned(), database_name: "".to_owned(), password: "".to_owned(), + runtimes: None, } } @@ -857,6 +853,11 @@ impl Builder { self } + pub fn runtimes(mut self, runtimes: RuntimesRef) -> Self { + self.runtimes = Some(runtimes); + self + } + pub fn build(self) -> Result { let uuid = Uuid::new_v4(); let id = BigEndian::read_u32(uuid.as_bytes()); @@ -869,24 +870,40 @@ impl Builder { if let Some(addr) = addr { let start = Instant::now(); - let tcp = TcpBuilder::new_v4().unwrap(); - // Set socket connect timeout - TcpStream::connect_timeout(&addr, self.connect_timeout)?; - - tcp.reuse_address(true)?; - - let stream = tcp.connect(addr)?; + let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?; + + socket2_socket.set_nodelay(true)?; + socket2_socket.set_reuse_address(true)?; + socket2_socket.set_read_timeout(Some(self.read_timeout))?; + socket2_socket.set_nonblocking(true)?; + socket2_socket.set_tcp_keepalive( + &TcpKeepalive::new().with_time(Duration::from_secs(SOCKET_KEEP_ALIVE_SECS)), + )?; + socket2_socket.set_send_buffer_size(READ_BUF_SIZE)?; + socket2_socket.set_recv_buffer_size(2 * READ_BUF_SIZE)?; + + let tokio_socket = TcpSocket::from_std_stream(socket2_socket.into()); + + // TODO: remove block_on + let stream = self + .runtimes + .clone() + .unwrap() + .default_runtime + .block_on(async move { + tokio_socket + .connect(addr) + .await + .map_err(|e| { + error!("Builder::build fail to connect to {}, err: {}.", addr, e); + e + }) + .unwrap() + }); debug!("Builder::build succeeds in connecting to {}.", addr); - stream.set_nodelay(true)?; - stream.set_read_timeout(Some(self.read_timeout))?; - stream.set_nonblocking(false)?; - stream.set_keepalive(Some(Duration::from_secs(SOCKET_KEEP_ALIVE_SECS)))?; - stream.set_send_buffer_size(READ_BUF_SIZE)?; - stream.set_recv_buffer_size(2 * READ_BUF_SIZE)?; - - let result = Connection::internal_new(id, addr, stream); + let result = Connection::internal_new(id, addr, stream, self.runtimes.unwrap()); OBKV_RPC_METRICS.observe_rpc_duration("connect", start.elapsed()); @@ -934,7 +951,7 @@ mod test { let res = conn .send(packet, channel_id) .expect("fail to send request") - .recv(); + .try_recv(); assert!(res.is_ok()); assert!(conn.close().is_ok()); } diff --git a/tests/test_table_client_base.rs b/tests/test_table_client_base.rs index 7d94351..30c5dc7 100644 --- a/tests/test_table_client_base.rs +++ b/tests/test_table_client_base.rs @@ -34,7 +34,7 @@ pub struct BaseTest { } impl BaseTest { - const ROW_NUM: usize = 500; + const ROW_NUM: usize = 400; const THREAD_NUM: usize = 10; pub fn new(client: ObTableClient) -> BaseTest { diff --git a/ycsb-rs/src/obkv_client.rs b/ycsb-rs/src/obkv_client.rs index bd59d59..4da8342 100644 --- a/ycsb-rs/src/obkv_client.rs +++ b/ycsb-rs/src/obkv_client.rs @@ -49,6 +49,9 @@ pub struct OBKVClientInitStruct { pub max_conns_per_server: usize, pub min_idle_conns_per_server: usize, pub conn_init_thread_num: usize, + + pub conn_reader_threads: usize, + pub conn_writer_threads: usize, } impl OBKVClientInitStruct { @@ -69,6 +72,8 @@ impl OBKVClientInitStruct { max_conns_per_server: props.max_conns_per_server, min_idle_conns_per_server: props.min_idle_conns_per_server, conn_init_thread_num: props.conn_init_thread_num, + conn_reader_threads: props.conn_reader_threads, + conn_writer_threads: props.conn_writer_threads, } } } @@ -90,6 +95,8 @@ impl OBKVClient { max_conns_per_server: config.max_conns_per_server, min_idle_conns_per_server: config.min_idle_conns_per_server, conn_init_thread_num: config.conn_init_thread_num, + conn_reader_threads: config.conn_reader_threads, + conn_writer_threads: config.conn_writer_threads, ..Default::default() }; let builder = Builder::new() @@ -155,6 +162,7 @@ impl DB for OBKVClient { vec![Value::from(key)], COLUMN_NAMES.iter().map(|s| s.to_string()).collect(), ); + assert!(result.is_ok()); assert_eq!(10, result?.len()); Ok(()) diff --git a/ycsb-rs/src/properties.rs b/ycsb-rs/src/properties.rs index 52ac18c..980fb9c 100644 --- a/ycsb-rs/src/properties.rs +++ b/ycsb-rs/src/properties.rs @@ -104,6 +104,14 @@ fn conn_init_thread_num_default() -> usize { 2 } +fn conn_reader_threads_default() -> usize { + 6 +} + +fn conn_writer_threads_default() -> usize { + 4 +} + #[derive(Deserialize, Debug)] pub struct Properties { #[serde(default = "zero_u64", rename = "insertstart")] @@ -201,4 +209,14 @@ pub struct Properties { rename = "conn_init_thread_num" )] pub conn_init_thread_num: usize, + #[serde( + default = "conn_reader_threads_default", + rename = "conn_reader_threads" + )] + pub conn_reader_threads: usize, + #[serde( + default = "conn_writer_threads_default", + rename = "conn_writer_threads" + )] + pub conn_writer_threads: usize, } diff --git a/ycsb-rs/workloads/workload_obkv.toml b/ycsb-rs/workloads/workload_obkv.toml index c2e514e..b456026 100644 --- a/ycsb-rs/workloads/workload_obkv.toml +++ b/ycsb-rs/workloads/workload_obkv.toml @@ -39,7 +39,7 @@ test_sys_user_name = "" test_sys_password = "" # How may YCSB Client will use a OBKV Client -obkv_client_reuse = 200 +obkv_client_reuse = 400 rpc_connect_timeout = 5000 rpc_read_timeout = 3000 @@ -48,6 +48,9 @@ rpc_operation_timeout = 3000 rpc_retry_limit = 3 rpc_retry_interval = 0 refresh_workers_num = 1 -max_conns_per_server = 5 -min_idle_conns_per_server = 5 +max_conns_per_server = 10 +min_idle_conns_per_server = 10 conn_init_thread_num = 1 + +conn_reader_threads = 6 +conn_writer_threads = 4 From 0fffa9f39499cd2aadb397d11a7ec314a842d01d Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Thu, 25 May 2023 15:23:00 +0800 Subject: [PATCH 3/5] [Fix] review --- src/client/mod.rs | 8 ++++-- src/client/table_client.rs | 20 +++++++------- src/rpc/mod.rs | 53 ++++++++++++++++++++++++-------------- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 3bf3e67..ee4865a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -159,6 +159,8 @@ pub struct ClientConfig { pub conn_writer_threads: usize, pub default_threads_num: usize, + pub conn_sender_mpsc_channel: usize, + pub log_level_flag: u16, } @@ -203,10 +205,12 @@ impl Default for ClientConfig { conn_init_thread_num: 2, query_concurrency_limit: None, - conn_reader_threads: 6, - conn_writer_threads: 4, + conn_reader_threads: 4, + conn_writer_threads: 2, default_threads_num: 2, + conn_sender_mpsc_channel: 100, + log_level_flag: DEFAULT_FLAG, } } diff --git a/src/client/table_client.rs b/src/client/table_client.rs index e63f0ca..2bddf4a 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -177,7 +177,6 @@ pub enum RunningMode { type Lock = Mutex; // ObTableClient inner implemetation. -#[allow(dead_code)] struct ObTableClientInner { location: ObTableLocation, ocp_manager: ObOcpModelManager, @@ -196,22 +195,22 @@ struct ObTableClientInner { closed: AtomicBool, status_mutex: Lock, - /// Client Runtimes + // Client Runtimes runtimes: RuntimesRef, - //ServerAddr(all) -> ObTableConnection + // ServerAddr(all) -> ObTableConnection table_roster: RwLock>>, server_roster: ServerRoster, running_mode: RunningMode, - //TableName -> TableEntry + // TableName -> TableEntry table_locations: RwLock>>, table_mutexs: RwLock>>, - //TableName -> rowKey element + // TableName -> rowKey element table_row_key_element: RwLock>>, connection_pools: RwLock>>, _retry_on_change_master: bool, - //TableName -> failure counter + // TableName -> failure counter table_continuous_failures: RwLock>>, refresh_metadata_mutex: Lock, @@ -452,7 +451,7 @@ impl ObTableClientInner { result.push((part_id, table.clone())); continue; } - //Table not found, try to refresh it and retry get it again. + // Table not found, try to refresh it and retry get it again. warn!("ObTableClientInner::get_tables can not get ob table by address {:?} so that will sync refresh metadata.", replica_location.addr()); self.sync_refresh_metadata()?; @@ -608,7 +607,8 @@ impl ObTableClientInner { .user_name(&self.user_name) .database_name(&self.database) .password(&self.password) - .runtimes(self.runtimes.clone()); + .runtimes(self.runtimes.clone()) + .sender_channel_size(self.config.conn_sender_mpsc_channel); let pool = Arc::new( ConnPoolBuilder::new() @@ -904,7 +904,7 @@ impl ObTableClientInner { refresh: bool, blocking: bool, ) -> Result> { - //Attempt to retrieve it from cache, avoid locking. + // Attempt to retrieve it from cache, avoid locking. if let Some(table_entry) = self.get_table_entry_from_cache(table_name) { //If the refresh is false indicates that user tolerate not the latest data if !refresh || !self.need_refresh_table_entry(&table_entry) { @@ -912,7 +912,7 @@ impl ObTableClientInner { } } - //Table entry is none or not refresh + // Table entry is none or not refresh let table_mutex = { let table_mutexs = self.table_mutexs.rl(); match table_mutexs.get(table_name) { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 261b4db..c93c290 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -42,7 +42,7 @@ use tokio::{ TcpSocket, TcpStream, }, sync::{mpsc, oneshot}, - time::{timeout as TokioTimeout, Duration as TokioDuration}, + time::Duration as TokioDuration, }; use tokio_util::codec::{Decoder, Encoder}; use uuid::Uuid; @@ -87,9 +87,10 @@ impl ConnectionSender { active: Arc, sender_runtime: RuntimeRef, default_runtime: RuntimeRef, + channel_capacity: usize, ) -> ConnectionSender { let (sender, mut receiver): (mpsc::Sender, mpsc::Receiver) = - mpsc::channel(100); + mpsc::channel(channel_capacity); let mut codec = ObTablePacketCodec::new(); let writer = sender_runtime.spawn(async move { @@ -107,7 +108,7 @@ impl ConnectionSender { if packet.is_close_poison() { break; } - //clear the buf for reuse + // clear the buf for reuse buf.clear(); let channel_id = packet.channel_id(); match codec.encode(packet, &mut buf) { @@ -157,7 +158,7 @@ impl ConnectionSender { active.store(false, Ordering::Release); if let Err(err) = write_stream.shutdown().await { - error!("Fail to close write stream to {}, err:{}", addr, err); + error!("Fail to close write stream to {addr}, err:{err}"); } drop(receiver); @@ -245,6 +246,7 @@ impl Connection { addr: SocketAddr, stream: TcpStream, runtimes: RuntimesRef, + channel_capacity: usize, ) -> Result { let requests: RequestsMap = Arc::new(Mutex::new(HashMap::new())); let read_requests = requests.clone(); @@ -277,6 +279,7 @@ impl Connection { active.clone(), runtimes.writer_runtime.clone(), runtimes.default_runtime.clone(), + channel_capacity, ), requests, continuous_timeout_failures: AtomicUsize::new(0), @@ -307,7 +310,7 @@ impl Connection { let mut buf = BytesMut::with_capacity(READ_BUF_SIZE); loop { if let Ok(()) = signal_receiver.try_recv() { - error!("DEBUG: out0"); + debug!("Connection::process_reading_data signal_receiver.try_recv() quit"); break; } @@ -367,10 +370,7 @@ impl Connection { CommonErrCode::Rpc, "connection reader exits".to_owned(), ))) { - error!( - "Connection::cancel_requests: fail to send cancel message, err:{}", - e.err().unwrap().to_string() - ); + error!("Connection::cancel_requests: fail to send cancel message, err:{e:?}"); } } } @@ -540,13 +540,13 @@ impl Connection { let rx = self.send(req, channel_id)?; if payload.timeout_millis() == 0 { - // no-wait request,return Ok directly2 + // no-wait request,return Ok directly return Ok(()); } // TODO: remove block_on with rx.await let resp = self.runtimes.default_runtime.block_on(async move { - match TokioTimeout(timeout, rx).await { + match tokio::time::timeout(timeout, rx).await { Ok(resp) => { self.on_recv_in_time(); resp.map_err(|e| { @@ -569,7 +569,7 @@ impl Connection { format!("wait for rpc response timeout, err:{err}"), )); } - }.expect("Tokio timeout panics, may be there is no current timer set") + }.map_err(|err| CommonErr(CommonErrCode::Rpc, format!("Tokio timeout error: {err:?}")))? }); match resp { @@ -605,9 +605,10 @@ impl Connection { CommonErrCode::Rpc, format!("transport code: [{code:?}], error: [{error}]"), )), - _other => { - panic!("Connection::execute unexpected response packet here."); - } + _other => Err(CommonErr( + CommonErrCode::Rpc, + "Connection::execute unexpected response packet.".parse()?, + )), } } @@ -691,13 +692,13 @@ impl Connection { } self.set_active(false); - //1. close writer + // 1. close writer if let Err(e) = self.sender.close() { error!("Connection::close fail to close writer, err: {}.", e); } - //2. close reader - //TODO: remove block_on + // 2. close reader + // TODO: remove block_on if let Err(e) = self.runtimes.default_runtime.block_on(async { self.reader_signal_sender .send(()) @@ -782,6 +783,8 @@ pub struct Builder { password: String, runtimes: Option, + + sender_channel_size: usize, } const SOCKET_KEEP_ALIVE_SECS: u64 = 15 * 60; @@ -800,6 +803,7 @@ impl Builder { database_name: "".to_owned(), password: "".to_owned(), runtimes: None, + sender_channel_size: 100, } } @@ -858,6 +862,11 @@ impl Builder { self } + pub fn sender_channel_size(mut self, size: usize) -> Self { + self.sender_channel_size = size; + self + } + pub fn build(self) -> Result { let uuid = Uuid::new_v4(); let id = BigEndian::read_u32(uuid.as_bytes()); @@ -903,7 +912,13 @@ impl Builder { debug!("Builder::build succeeds in connecting to {}.", addr); - let result = Connection::internal_new(id, addr, stream, self.runtimes.unwrap()); + let result = Connection::internal_new( + id, + addr, + stream, + self.runtimes.unwrap(), + self.sender_channel_size, + ); OBKV_RPC_METRICS.observe_rpc_duration("connect", start.elapsed()); From 8b7dedf88a8dccbcfc885a3c4fa8b2dc3a3502c7 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Mon, 29 May 2023 17:23:32 +0800 Subject: [PATCH 4/5] [Fix] review --- src/client/mod.rs | 12 ++++++------ src/client/table_client.rs | 16 +++++++++++----- src/rpc/mod.rs | 2 -- ycsb-rs/src/obkv_client.rs | 12 ++++++------ ycsb-rs/src/properties.rs | 16 ++++++++-------- ycsb-rs/workloads/workload_obkv.toml | 4 ++-- 6 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index ee4865a..46317c2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -155,9 +155,9 @@ pub struct ClientConfig { pub conn_init_thread_num: usize, pub query_concurrency_limit: Option, - pub conn_reader_threads: usize, - pub conn_writer_threads: usize, - pub default_threads_num: usize, + pub conn_reader_thread_num: usize, + pub conn_writer_thread_num: usize, + pub default_thread_num: usize, pub conn_sender_mpsc_channel: usize, @@ -205,9 +205,9 @@ impl Default for ClientConfig { conn_init_thread_num: 2, query_concurrency_limit: None, - conn_reader_threads: 4, - conn_writer_threads: 2, - default_threads_num: 2, + conn_reader_thread_num: 4, + conn_writer_thread_num: 2, + default_thread_num: 2, conn_sender_mpsc_channel: 100, diff --git a/src/client/table_client.rs b/src/client/table_client.rs index 2bddf4a..3cd8885 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -1502,8 +1502,8 @@ pub struct ObClientRuntimes { impl ObClientRuntimes { pub fn test_default() -> ObClientRuntimes { ObClientRuntimes { - reader_runtime: Arc::new(build_runtime("ob-conn-read", 1)), - writer_runtime: Arc::new(build_runtime("ob-conn-write", 1)), + reader_runtime: Arc::new(build_runtime("ob-conn-reader", 1)), + writer_runtime: Arc::new(build_runtime("ob-conn-writer", 1)), default_runtime: Arc::new(build_runtime("ob-default", 1)), } } @@ -1520,9 +1520,15 @@ fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { fn build_obkv_runtimes(config: &ClientConfig) -> ObClientRuntimes { ObClientRuntimes { - reader_runtime: Arc::new(build_runtime("ob-conn-read", config.conn_reader_threads)), - writer_runtime: Arc::new(build_runtime("ob-conn-write", config.conn_writer_threads)), - default_runtime: Arc::new(build_runtime("ob-default", config.default_threads_num)), + reader_runtime: Arc::new(build_runtime( + "ob-conn-reader", + config.conn_reader_thread_num, + )), + writer_runtime: Arc::new(build_runtime( + "ob-conn-writer", + config.conn_writer_thread_num, + )), + default_runtime: Arc::new(build_runtime("ob-default", config.default_thread_num)), } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index c93c290..3a33b38 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -359,8 +359,6 @@ impl Connection { } } } - // TODO: stream close - drop(read_stream); } fn cancel_requests(requests: &RequestsMap) { diff --git a/ycsb-rs/src/obkv_client.rs b/ycsb-rs/src/obkv_client.rs index 4da8342..b6e558c 100644 --- a/ycsb-rs/src/obkv_client.rs +++ b/ycsb-rs/src/obkv_client.rs @@ -50,8 +50,8 @@ pub struct OBKVClientInitStruct { pub min_idle_conns_per_server: usize, pub conn_init_thread_num: usize, - pub conn_reader_threads: usize, - pub conn_writer_threads: usize, + pub conn_reader_thread_num: usize, + pub conn_writer_thread_num: usize, } impl OBKVClientInitStruct { @@ -72,8 +72,8 @@ impl OBKVClientInitStruct { max_conns_per_server: props.max_conns_per_server, min_idle_conns_per_server: props.min_idle_conns_per_server, conn_init_thread_num: props.conn_init_thread_num, - conn_reader_threads: props.conn_reader_threads, - conn_writer_threads: props.conn_writer_threads, + conn_reader_thread_num: props.conn_reader_thread_num, + conn_writer_thread_num: props.conn_writer_thread_num, } } } @@ -95,8 +95,8 @@ impl OBKVClient { max_conns_per_server: config.max_conns_per_server, min_idle_conns_per_server: config.min_idle_conns_per_server, conn_init_thread_num: config.conn_init_thread_num, - conn_reader_threads: config.conn_reader_threads, - conn_writer_threads: config.conn_writer_threads, + conn_reader_thread_num: config.conn_reader_thread_num, + conn_writer_thread_num: config.conn_writer_thread_num, ..Default::default() }; let builder = Builder::new() diff --git a/ycsb-rs/src/properties.rs b/ycsb-rs/src/properties.rs index 980fb9c..369e297 100644 --- a/ycsb-rs/src/properties.rs +++ b/ycsb-rs/src/properties.rs @@ -104,11 +104,11 @@ fn conn_init_thread_num_default() -> usize { 2 } -fn conn_reader_threads_default() -> usize { +fn conn_reader_thread_num_default() -> usize { 6 } -fn conn_writer_threads_default() -> usize { +fn conn_writer_thread_num_default() -> usize { 4 } @@ -210,13 +210,13 @@ pub struct Properties { )] pub conn_init_thread_num: usize, #[serde( - default = "conn_reader_threads_default", - rename = "conn_reader_threads" + default = "conn_reader_thread_num_default", + rename = "conn_reader_thread_num" )] - pub conn_reader_threads: usize, + pub conn_reader_thread_num: usize, #[serde( - default = "conn_writer_threads_default", - rename = "conn_writer_threads" + default = "conn_writer_thread_num_default", + rename = "conn_writer_thread_num" )] - pub conn_writer_threads: usize, + pub conn_writer_thread_num: usize, } diff --git a/ycsb-rs/workloads/workload_obkv.toml b/ycsb-rs/workloads/workload_obkv.toml index b456026..1b33790 100644 --- a/ycsb-rs/workloads/workload_obkv.toml +++ b/ycsb-rs/workloads/workload_obkv.toml @@ -52,5 +52,5 @@ max_conns_per_server = 10 min_idle_conns_per_server = 10 conn_init_thread_num = 1 -conn_reader_threads = 6 -conn_writer_threads = 4 +conn_reader_thread_num = 6 +conn_writer_thread_num = 4 From 00583e5e59ff4a58744f3368936f2395bec7f617 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Mon, 29 May 2023 20:34:20 +0800 Subject: [PATCH 5/5] [Fix] review --- src/client/mod.rs | 4 ++-- src/client/table_client.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 46317c2..7283ddd 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -159,7 +159,7 @@ pub struct ClientConfig { pub conn_writer_thread_num: usize, pub default_thread_num: usize, - pub conn_sender_mpsc_channel: usize, + pub max_inflight_reqs_per_conn: usize, pub log_level_flag: u16, } @@ -209,7 +209,7 @@ impl Default for ClientConfig { conn_writer_thread_num: 2, default_thread_num: 2, - conn_sender_mpsc_channel: 100, + max_inflight_reqs_per_conn: 100, log_level_flag: DEFAULT_FLAG, } diff --git a/src/client/table_client.rs b/src/client/table_client.rs index 3cd8885..c43862b 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -608,7 +608,7 @@ impl ObTableClientInner { .database_name(&self.database) .password(&self.password) .runtimes(self.runtimes.clone()) - .sender_channel_size(self.config.conn_sender_mpsc_channel); + .sender_channel_size(self.config.max_inflight_reqs_per_conn); let pool = Arc::new( ConnPoolBuilder::new()