Skip to content

[Feat] Async Interface #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 13, 2023
Merged

[Feat] Async Interface #52

merged 9 commits into from
Jun 13, 2023

Conversation

IHEII
Copy link
Contributor

@IHEII IHEII commented Jun 6, 2023

Task Description

  • Implement the asynchronous interface for the OBKV client.

Solution Description

Passed Regressions

  • Pass all tests in 3.x

Upgrade Compatibility

  • Synchronous API may not be provided, but users could find it on branch SyncInterface, and we are considering adding those Synchronous APIs into the client.

Other Information

Release Note

  • Impl async APIs

@IHEII IHEII requested review from WeiXinChan and shenyunlong June 6, 2023 08:08
Comment on lines 401 to 404
if self.closed {
} else {
error!("QueryStreamResult::close fail")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.closed {
} else {
error!("QueryStreamResult::close fail")
}
if !self.closed {
error!("QueryStreamResult::drop stream is not closed when drop")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


impl Iterator for QueryResultSet {
type Item = Result<HashMap<String, Value>>;
pub async fn async_close(&mut self) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not remove the close method, and name this method as close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, only keep the async close now.

}

pub fn operation_timeout(&self) -> Duration {
self.config.rpc_operation_timeout
}

fn execute(
/// Execute a batch operation on a table
pub async fn execute_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should move this method to the obclient if all the other methods are removed from the ObTable.

Copy link
Contributor Author

@IHEII IHEII Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only keep the methods used by ObTableClient now, execute_batch() is used in batch operation, and execute() is used in regular operation.
When we use get(), we actually get the data from an observer. ObTable is an abstraction of the observer, so every request needs to be executed by the ObTable.

}

#[allow(dead_code)]
// impl ObTableStreamQuerier for obtable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these traits including ObTableStreamQuerier, Table and StreamQuerier be removed? I remember that only just one implementation for them, and they are too complex to be mocked if we want to use them in unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, only keep the ObTableClientStreamQuerier now, and rename it into StreamQuerier in case to keep the code clean.

@@ -249,7 +246,7 @@ impl ConnPool {
Ok(())
}

pub fn get(&self) -> Result<Arc<Connection>> {
pub async fn get(&self) -> Result<Arc<Connection>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this async necessary? No await is found in the code block of get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -319,42 +316,42 @@ struct SharedPool {
conn_builder: ConnBuilder,
inner: Mutex<PoolInner>,
cond: Condvar,
conn_init_thread_pool: Arc<ScheduledThreadPool>,
runtimes: RuntimesRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just provide a specific runtime is enough? I guess we can call it as bg_runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We reduce the runtime (learn more from ObClientRuntimes), and use bg_runtime here.

src/rpc/mod.rs Outdated
@@ -679,24 +672,29 @@ impl Connection {
/// invalidated.
///
///For info on default settings see [Builder](struct.Builder.html)
pub fn new() -> Result<Connection> {
Builder::new().build()
pub async fn new() -> Result<Connection> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_new is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 1495 to 1498
batch_op_runtime: Arc::new(build_runtime(
"ob-batch-executor",
config.batch_op_thread_num,
)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this runtime is not necessary in the async context, how about remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, we use bg_runtime to deal with a batch operation. It is necessary to create a runtime to process those requests, since batch operation may operate data from different partitions and servers. Hence, we need to seed packages simultaneously and aggregate the results.

"ob-batch-executor",
config.batch_op_thread_num,
)),
query_runtime: Arc::new(build_runtime("ob-query-executor", config.query_thread_num)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use reader_runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

config.batch_op_thread_num,
)),
query_runtime: Arc::new(build_runtime("ob-query-executor", config.query_thread_num)),
conn_init_runtime: Arc::new(build_runtime("ob-conn-initer", config.conn_init_thread_num)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe what we need here is just a bg_runtime to do some work including connection initialization, table location refreshing?

And replace default_runtime with the bg_runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We only keep three necessary runtimes for different tasks. See ObClientRuntimes

default_runtime: Arc::new(build_runtime("ob-default", config.default_thread_num)),
tcp_recv_runtime: Arc::new(build_runtime("ob-tcp-reviever", config.tcp_recv_thread_num)),
tcp_send_runtime: Arc::new(build_runtime("ob-tcp-sender", config.tcp_send_thread_num)),
bg_runtime: Arc::new(build_runtime("ob_bg", config.bg_thread_num)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bg_runtime: Arc::new(build_runtime("ob_bg", config.bg_thread_num)),
bg_runtime: Arc::new(build_runtime("ob-bg", config.bg_thread_num)),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@ShiKaiWi
Copy link
Contributor

LGTM

src/rpc/mod.rs Outdated
}

/// Shutdown the sender without closing remote
fn shutdown(&mut self) -> Result<()> {
let writer = mem::replace(&mut self.writer, None);
let drop_helper = AbortOnDropMany(vec![writer.unwrap()]);
drop(drop_helper);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems no need to drop manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

src/rpc/mod.rs Outdated
Comment on lines 191 to 193
fn shutdown(&mut self) -> Result<()> {
let writer = mem::replace(&mut self.writer, None);
let drop_helper = AbortOnDropMany(vec![writer.unwrap()]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be better if you put it as:

if let Some(writer) = mem::take(&mut self.writer) {
   writer.abort();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

src/rpc/mod.rs Outdated
}
}

let reader = mem::replace(&mut self.reader, None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mem::take is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

src/rpc/mod.rs Outdated
async fn close(&mut self) -> Result<()> {
self.request(ObTablePacket::ClosePoison).await?;
self.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to close the ConnectionSender gracefully, I guess we should join the writer handle after sending the poison signal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, the writer will handle ObTablePacket::ClosePoison packet, and will no longer receive packets from mpsc::channel. Then JoinHandle writer will drop automatically, all we need to do is wait.

src/rpc/mod.rs Outdated
let drop_helper = AbortOnDropMany(vec![writer.unwrap()]);
drop(drop_helper);
if let Some(writer) = mem::take(&mut self.writer) {
let _drop_helper = AbortOnDropMany(vec![writer]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer.abort() is enough to avoid one more memory allocation for AbortOnDropMany.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

src/rpc/mod.rs Outdated
if let Some(writer) = mem::take(&mut self.writer) {
writer.await??
}
self.sender.closed().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there is no need to check wait for sender to be closed? What we cares about is whether the tcp send worker is stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@IHEII IHEII merged commit e13051d into oceanbase:AsyncDev Jun 13, 2023
@IHEII IHEII deleted the async branch June 14, 2023 01:34
@IHEII IHEII restored the async branch June 14, 2023 01:34
@IHEII IHEII deleted the async branch June 14, 2023 06:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants