-
Notifications
You must be signed in to change notification settings - Fork 19
Async Connection Layer #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This MR is the premise of implementing the asynchronous API(#16) |
LGTM |
src/rpc/mod.rs
Outdated
@@ -544,42 +540,45 @@ impl Connection { | |||
let rx = self.send(req, channel_id)?; | |||
|
|||
if payload.timeout_millis() == 0 { | |||
// no-wait request,return Ok directly | |||
// no-wait request,return Ok directly2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
directly2 -> directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
if let Err(e) = sender.send(Err(CommonErr( | ||
CommonErrCode::Rpc, | ||
"connection reader exits".to_owned(), | ||
))) { | ||
error!( | ||
"Connection::cancel_requests: fail to send cancel message, err:{}", | ||
e | ||
e.err().unwrap().to_string() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this unwrap safe? Can we just print {e:?}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
@@ -544,42 +540,45 @@ impl Connection { | |||
let rx = self.send(req, channel_id)?; | |||
|
|||
if payload.timeout_millis() == 0 { | |||
// no-wait request,return Ok directly | |||
// no-wait request,return Ok directly2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// no-wait request,return Ok directly2 | |
// no-wait request,return Ok directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
error!( | ||
// TODO: remove block_on with rx.await | ||
let resp = self.runtimes.default_runtime.block_on(async move { | ||
match TokioTimeout(timeout, rx).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When use a free function from a module, just use <module_name>::<function_name>
:
match TokioTimeout(timeout, rx).await { | |
match tokio::time::timeout(timeout, rx).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
format!("wait for rpc response timeout, err:{err}"), | ||
)); | ||
} | ||
}.expect("Tokio timeout panics, may be there is no current timer set") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a library, throw up the error instead of panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
//2. close reader | ||
if let Err(e) = self | ||
.reader_signal_sender | ||
.send(()) | ||
.map_err(ConnectionSender::broken_pipe) | ||
{ | ||
//TODO: remove block_on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 2. close reader
// TODO: remove block_on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -54,6 +54,7 @@ serde = "1.0" | |||
serde_bytes = "0.11" | |||
serde_derive = "1.0" | |||
serde_json = "1.0" | |||
socket2 = "0.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that what the tokio::net
module uses is just socket2
, so I guess there is no need to import it explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set some parameters like "tcp_keepalive", and "buffer_size"..., it turns out that we have to use socket2 to define a socket that owns those properties to create a Tokio TCP stream. Socket2 is required explicitly, in order to customize a TCP stream.
Learn more from rust-lang/rust#69774 and rust-lang
src/client/mod.rs
Outdated
conn_reader_threads: 6, | ||
conn_writer_threads: 4, | ||
default_threads_num: 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not keep them in the same style? And how about setting all these runtime's thread number is 2 by default?
conn_reader_threads: 6, | |
conn_writer_threads: 4, | |
default_threads_num: 2, | |
conn_reader_thread_num: 2, | |
conn_writer_thread_num: 2, | |
default_thread_num: 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I could modify it into 422. The reason why I set reader threads into 4 is that there are more "read/get tasks" in practice. After I test this client on the YCSB, the number of reader threads and the number of connections are the bottleneck of the client. So 422 is a better choice, which implies that the count of reader threads should be more than others.
/// OBKV Table client | ||
#[derive(Clone)] | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why dead code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead_code is for runtimes. Runtimes that we add in this commit are for async refresh_thread_pool, which have not been implemented yet.
src/client/table_client.rs
Outdated
@@ -193,6 +196,9 @@ struct ObTableClientInner { | |||
closed: AtomicBool, | |||
status_mutex: Lock, | |||
|
|||
/// Client Runtimes | |||
runtimes: RuntimesRef, | |||
|
|||
//ServerAddr(all) -> ObTableConnection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"//" -> "// "
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
) -> ConnectionSender { | ||
let (sender, receiver): (Sender<ObTablePacket>, Receiver<ObTablePacket>) = unbounded(); | ||
let (sender, mut receiver): (mpsc::Sender<ObTablePacket>, mpsc::Receiver<ObTablePacket>) = | ||
mpsc::channel(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
break; | ||
} | ||
} | ||
} | ||
|
||
active.store(false, Ordering::Release); | ||
|
||
if let Err(err) = write_stream.shutdown(Shutdown::Write) { | ||
if let Err(err) = write_stream.shutdown().await { | ||
error!("Fail to close write stream to {}, err:{}", addr, err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I know, such as error!("Fail to close write stream to {addr}, err:{err}")
is supported now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
@@ -302,12 +307,13 @@ impl Connection { | |||
let mut buf = BytesMut::with_capacity(READ_BUF_SIZE); | |||
loop { | |||
if let Ok(()) = signal_receiver.try_recv() { | |||
error!("DEBUG: out0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it to debug! and refactor the logged message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
); | ||
} | ||
// TODO: stream close | ||
drop(read_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not shutdown like write_stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From document of OwnedWriteHalf, shutdown is the interface of AsyncWrite, it also mentions that "Dropping the write half will also shut down the write half of the TCP stream. ".
read_stream is an OwnedReadHalf, which means it does not have the shutdown interface. Learn more from OwnedReadHalf
By the way, it seems that tokio users never care about GC on stream. It is still a problem, so I leave a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to drop I think, when the function end, it will be dropped automatically.
In fact this is the classic RAII.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix review
@@ -54,6 +54,7 @@ serde = "1.0" | |||
serde_bytes = "0.11" | |||
serde_derive = "1.0" | |||
serde_json = "1.0" | |||
socket2 = "0.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set some parameters like "tcp_keepalive", and "buffer_size"..., it turns out that we have to use socket2 to define a socket that owns those properties to create a Tokio TCP stream. Socket2 is required explicitly, in order to customize a TCP stream.
Learn more from rust-lang/rust#69774 and rust-lang
src/client/mod.rs
Outdated
conn_reader_threads: 6, | ||
conn_writer_threads: 4, | ||
default_threads_num: 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I could modify it into 422. The reason why I set reader threads into 4 is that there are more "read/get tasks" in practice. After I test this client on the YCSB, the number of reader threads and the number of connections are the bottleneck of the client. So 422 is a better choice, which implies that the count of reader threads should be more than others.
src/client/table_client.rs
Outdated
@@ -193,6 +196,9 @@ struct ObTableClientInner { | |||
closed: AtomicBool, | |||
status_mutex: Lock, | |||
|
|||
/// Client Runtimes | |||
runtimes: RuntimesRef, | |||
|
|||
//ServerAddr(all) -> ObTableConnection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/// OBKV Table client | ||
#[derive(Clone)] | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead_code is for runtimes. Runtimes that we add in this commit are for async refresh_thread_pool, which have not been implemented yet.
src/rpc/mod.rs
Outdated
if let Err(e) = sender.send(Err(CommonErr( | ||
CommonErrCode::Rpc, | ||
"connection reader exits".to_owned(), | ||
))) { | ||
error!( | ||
"Connection::cancel_requests: fail to send cancel message, err:{}", | ||
e | ||
e.err().unwrap().to_string() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
//2. close reader | ||
if let Err(e) = self | ||
.reader_signal_sender | ||
.send(()) | ||
.map_err(ConnectionSender::broken_pipe) | ||
{ | ||
//TODO: remove block_on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
@@ -302,12 +307,13 @@ impl Connection { | |||
let mut buf = BytesMut::with_capacity(READ_BUF_SIZE); | |||
loop { | |||
if let Ok(()) = signal_receiver.try_recv() { | |||
error!("DEBUG: out0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
); | ||
} | ||
// TODO: stream close | ||
drop(read_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From document of OwnedWriteHalf, shutdown is the interface of AsyncWrite, it also mentions that "Dropping the write half will also shut down the write half of the TCP stream. ".
read_stream is an OwnedReadHalf, which means it does not have the shutdown interface. Learn more from OwnedReadHalf
By the way, it seems that tokio users never care about GC on stream. It is still a problem, so I leave a TODO here.
src/rpc/mod.rs
Outdated
) -> ConnectionSender { | ||
let (sender, receiver): (Sender<ObTablePacket>, Receiver<ObTablePacket>) = unbounded(); | ||
let (sender, mut receiver): (mpsc::Sender<ObTablePacket>, mpsc::Receiver<ObTablePacket>) = | ||
mpsc::channel(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rpc/mod.rs
Outdated
break; | ||
} | ||
} | ||
} | ||
|
||
active.store(false, Ordering::Release); | ||
|
||
if let Err(err) = write_stream.shutdown(Shutdown::Write) { | ||
if let Err(err) = write_stream.shutdown().await { | ||
error!("Fail to close write stream to {}, err:{}", addr, err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix review
src/client/mod.rs
Outdated
pub conn_reader_threads: usize, | ||
pub conn_writer_threads: usize, | ||
pub default_threads_num: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we keep the names of these three config options in the same style:
pub conn_reader_threads: usize, | |
pub conn_writer_threads: usize, | |
pub default_threads_num: usize, | |
pub conn_reader_thread_num: usize, | |
pub conn_writer_thread_num: usize, | |
pub default_thread_num: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -34,7 +34,7 @@ pub struct BaseTest { | |||
} | |||
|
|||
impl BaseTest { | |||
const ROW_NUM: usize = 500; | |||
const ROW_NUM: usize = 400; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why modify to 400?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the client and the observer are not in the same place, the latency will make this test spend too much time. Reduce the count of rows to reduce the runtime.
src/rpc/mod.rs
Outdated
); | ||
} | ||
// TODO: stream close | ||
drop(read_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to drop I think, when the function end, it will be dropped automatically.
In fact this is the classic RAII.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix review
@@ -34,7 +34,7 @@ pub struct BaseTest { | |||
} | |||
|
|||
impl BaseTest { | |||
const ROW_NUM: usize = 500; | |||
const ROW_NUM: usize = 400; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the client and the observer are not in the same place, the latency will make this test spend too much time. Reduce the count of rows to reduce the runtime.
src/rpc/mod.rs
Outdated
); | ||
} | ||
// TODO: stream close | ||
drop(read_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done.
src/client/mod.rs
Outdated
pub conn_reader_threads: usize, | ||
pub conn_writer_threads: usize, | ||
pub default_threads_num: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/client/mod.rs
Outdated
pub conn_writer_thread_num: usize, | ||
pub default_thread_num: usize, | ||
|
||
pub conn_sender_mpsc_channel: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need a more readable name for this option:
pub conn_sender_mpsc_channel: usize, | |
pub max_reqs_in_flight_per_conn: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, using max_inflight_reqs_per_conn instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What changes were proposed in this pull request?
Why are the changes needed?
For better performance.
Will break the compatibility? How if so?
Users will currently not be aware of asynchronous changes in the connection layer, and they will use the existing code without changing it.
Does this PR introduce any user-facing change?
Users will currently not be aware of asynchronous changes in the connection layer, and they will use the existing code without changing it.
How was this patch tested?
3.2.4
Checklist