Skip to content

Commit e13051d

Browse files
authored
[Feat] Async Interface (#52)
* [Feat] async client * [Feat] async client * [Fix] remove useless APIs * [Fix] modify runtime/query * [Feat] batch ycsb * [Fix] review * [Fix] connection close process * [Fix] remove trait Table & fix review * [Fix] review
1 parent 7bcf0c5 commit e13051d

29 files changed

+1866
-1869
lines changed

benches/concurrent_insert/mod.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
extern crate obkv;
1919

20-
use std::{sync::Arc, thread, time};
20+
use std::{sync::Arc, time};
2121

22-
use obkv::{serde_obkv::value::Value, Builder, ObTableClient, RunningMode, Table};
22+
use obkv::{serde_obkv::value::Value, Builder, ObTableClient, RunningMode};
23+
use tokio::task;
2324

2425
// TODO: use test conf to control which environments to test.
2526
const TEST_FULL_USER_NAME: &str = "test";
@@ -54,11 +55,11 @@ const TABLE_NAME: &str = "series_key_to_id_0";
5455
// PRIMARY KEY(series_key),
5556
// KEY index_id(series_id)
5657
// );
57-
fn concurrent_insert(client: Arc<ObTableClient>) {
58+
async fn concurrent_insert(client: Arc<ObTableClient>) {
5859
let mut thds = Vec::with_capacity(20);
5960
for i in 0..50 {
6061
let client = client.clone();
61-
let thd = thread::spawn(move || {
62+
let thd = task::spawn(async move {
6263
for j in i * 100..(i * 100 + 50) {
6364
let series_key = format!("series_key_test_padding_padding_{j}");
6465
let series_id = j * j;
@@ -69,6 +70,7 @@ fn concurrent_insert(client: Arc<ObTableClient>) {
6970
vec!["series_id".to_owned()],
7071
vec![Value::from(series_id as i64)],
7172
)
73+
.await
7274
.unwrap_or_else(|err| {
7375
panic!("fail to insert row:{series_key} {series_id}, err:{err}")
7476
});
@@ -78,18 +80,20 @@ fn concurrent_insert(client: Arc<ObTableClient>) {
7880
}
7981

8082
for (i, thd) in thds.into_iter().enumerate() {
81-
thd.join()
83+
thd.await
8284
.unwrap_or_else(|_| panic!("thread#{i} fail to join"));
8385
}
8486
}
8587

86-
fn main() {
87-
let client = build_client(RunningMode::Normal);
88+
#[tokio::main]
89+
async fn main() {
90+
let client_handle = task::spawn_blocking(|| build_client(RunningMode::Normal));
91+
let client = client_handle.await.unwrap();
8892
client
8993
.truncate_table(TABLE_NAME)
9094
.expect("fail to truncate the table");
9195
let start = time::Instant::now();
92-
concurrent_insert(Arc::new(client));
96+
concurrent_insert(Arc::new(client)).await;
9397
let elapsed = time::Instant::now() - start;
9498
println!("Benches::concurrent_insert cost time:{elapsed:?}");
9599
}

docs/simple_demo/simple_operation/demo.md

+20-27
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Demo for obkv-table-client-rs
2-
Edited by OBKV developers on March 3, 2023.
2+
Edited by OBKV developers on June 6, 2023.
33

44
## Introduction
55
obkv-table-client-rs is Rust Library that can access table data from OceanBase storage layer.
@@ -13,42 +13,40 @@ Now we provide an interface to access data from OceanBase, which we will introdu
1313
obkv-table-client-rs support several simple operations, such as get, insert, update, insert_or_update, replace, append, increment, delete.
1414

1515
```rust Table and ObTableClient
16-
impl Table for ObTableClient {
17-
// implement operation in Table
18-
// ...
19-
}
20-
21-
pub trait Table {
22-
fn insert(
16+
impl ObTableClient {
17+
// implement operation
18+
#[inline]
19+
pub async fn insert(
2320
&self,
2421
table_name: &str,
2522
row_keys: Vec<Value>,
2623
columns: Vec<String>,
2724
properties: Vec<Value>,
28-
) -> Result<i64>;
25+
) -> Result<i64> {}
2926

30-
fn update(
27+
#[inline]
28+
pub async fn update(
3129
&self,
3230
table_name: &str,
3331
row_keys: Vec<Value>,
3432
columns: Vec<String>,
3533
properties: Vec<Value>,
36-
) -> Result<i64>;
34+
) -> Result<i64> {}
3735
// ...
3836
}
3937
```
4038

4139
A simple operation example is shown below:
4240
```rust simple operation example
43-
fn simple_operation() {
41+
async fn simple_operation() {
4442
let client = build_normal_client();
4543

4644
let result = client.insert(
4745
"your_table_name",
4846
vec![Value::from("foo")],
4947
vec!["c2".to_owned()],
5048
vec![Value::from("baz")],
51-
);
49+
).await;
5250

5351
assert!(result.is_ok());
5452
}
@@ -72,7 +70,7 @@ impl ObTableBatchOperation {
7270
```
7371
A simple batch operation example is shown below:
7472
```rust batch operation example
75-
fn batch_operation() {
73+
async fn batch_operation() {
7674
let client = utils::common::build_normal_client();
7775

7876
// set number of operations in batch_op
@@ -87,7 +85,7 @@ fn batch_operation() {
8785
);
8886

8987
// execute
90-
let result = client.execute_batch("your_table_name", batch_op);
88+
let result = client.execute_batch("your_table_name", batch_op).await;
9189
assert!(result.is_ok());
9290
}
9391
```
@@ -97,30 +95,25 @@ More [demos](https://github.com/oceanbase/obkv-table-client-rs/blob/main/tests/t
9795
Query is different from get, it allows the user to get a range of data.
9896
A **Query** could get from **ObTableClient** by calling ```query()``` method, then you could customize your query by calling methods in **ObTableClientQueryImpl** and **TableQuery**.
9997
```rust ObTableClientQueryImpll
100-
impl TableQuery for ObTableClientQueryImpl {
101-
// implement methods from TableQuery
102-
// ...
103-
}
104-
105-
pub trait TableQuery {
106-
fn execute(&self) -> Result<QueryResultSet>;
107-
fn select(self, columns: Vec<String>) -> Self;
98+
impl ObTableClientQueryImpl {
99+
pub async fn execute(&self) -> Result<QueryResultSet> {}
100+
pub fn select(self, columns: Vec<String>) -> Self {}
108101
// ...
109-
fn clear(&mut self);
102+
pub fn clear(&mut self) {}
110103
}
111104
```
112105
A simple query example is shown below:
113106
```rust query example
114-
fn query() {
107+
async fn query() {
115108
let client = utils::common::build_normal_client();
116109

117110
let query = client
118111
.query("your_table_name")
119112
.select(vec!["c1".to_owned()])
120113
.scan_order(false)
121-
.add_scan_range(vec![Value::from("123")], true, vec![Value::from("567")], true)
114+
.add_scan_range(vec![Value::from("123")], true, vec![Value::from("567")], true);
122115

123-
let result = query.execute();
116+
let result = query.execute().await;
124117
assert!(result.is_ok());
125118
}
126119
```

src/client/mod.rs

+7-94
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
use std::{collections::HashMap, time::Duration};
1919

20-
use crate::{
21-
error::Result,
22-
rpc::protocol::{payloads::ObTableBatchOperation, DEFAULT_FLAG},
23-
serde_obkv::value::Value,
24-
};
20+
use crate::{rpc::protocol::DEFAULT_FLAG, serde_obkv::value::Value};
2521

2622
mod ocp;
2723
pub mod query;
@@ -35,83 +31,6 @@ pub enum TableOpResult {
3531
RetrieveRows(HashMap<String, Value>),
3632
}
3733

38-
pub trait Table {
39-
/// Insert a record
40-
fn insert(
41-
&self,
42-
table_name: &str,
43-
row_keys: Vec<Value>,
44-
columns: Vec<String>,
45-
properties: Vec<Value>,
46-
) -> Result<i64>;
47-
48-
/// Update a record
49-
fn update(
50-
&self,
51-
table_name: &str,
52-
row_keys: Vec<Value>,
53-
columns: Vec<String>,
54-
properties: Vec<Value>,
55-
) -> Result<i64>;
56-
57-
/// Insert or update a record, if the record exists, update it.
58-
/// Otherwise insert a new one.
59-
fn insert_or_update(
60-
&self,
61-
table_name: &str,
62-
row_keys: Vec<Value>,
63-
columns: Vec<String>,
64-
properties: Vec<Value>,
65-
) -> Result<i64>;
66-
67-
/// Replace a record.
68-
fn replace(
69-
&self,
70-
table_name: &str,
71-
row_keys: Vec<Value>,
72-
columns: Vec<String>,
73-
properties: Vec<Value>,
74-
) -> Result<i64>;
75-
76-
/// Append
77-
fn append(
78-
&self,
79-
table_name: &str,
80-
row_keys: Vec<Value>,
81-
columns: Vec<String>,
82-
properties: Vec<Value>,
83-
) -> Result<i64>;
84-
85-
/// Increment
86-
fn increment(
87-
&self,
88-
table_name: &str,
89-
row_keys: Vec<Value>,
90-
columns: Vec<String>,
91-
properties: Vec<Value>,
92-
) -> Result<i64>;
93-
94-
/// Delete records by row keys.
95-
fn delete(&self, table_name: &str, row_keys: Vec<Value>) -> Result<i64>;
96-
97-
/// Retrieve a record by row keys.
98-
fn get(
99-
&self,
100-
table_name: &str,
101-
row_keys: Vec<Value>,
102-
columns: Vec<String>,
103-
) -> Result<HashMap<String, Value>>;
104-
105-
/// Create a batch operation
106-
fn batch_operation(&self, ops_num_hint: usize) -> ObTableBatchOperation;
107-
// Execute a batch operation
108-
fn execute_batch(
109-
&self,
110-
table_name: &str,
111-
batch_op: ObTableBatchOperation,
112-
) -> Result<Vec<TableOpResult>>;
113-
}
114-
11534
/// ObTable client config
11635
#[derive(Clone, Debug, Eq, PartialEq)]
11736
pub struct ClientConfig {
@@ -136,8 +55,6 @@ pub struct ClientConfig {
13655
pub table_entry_refresh_try_interval: Duration,
13756
pub table_entry_refresh_continuous_failure_ceiling: usize,
13857

139-
pub table_batch_op_thread_num: usize,
140-
14158
pub server_address_priority_timeout: Duration,
14259
pub runtime_continuous_failure_ceiling: usize,
14360

@@ -152,12 +69,11 @@ pub struct ClientConfig {
15269

15370
pub max_conns_per_server: usize,
15471
pub min_idle_conns_per_server: usize,
155-
pub conn_init_thread_num: usize,
15672
pub query_concurrency_limit: Option<usize>,
15773

158-
pub conn_reader_thread_num: usize,
159-
pub conn_writer_thread_num: usize,
160-
pub default_thread_num: usize,
74+
pub tcp_recv_thread_num: usize,
75+
pub tcp_send_thread_num: usize,
76+
pub bg_thread_num: usize,
16177

16278
pub max_inflight_reqs_per_conn: usize,
16379

@@ -186,8 +102,6 @@ impl Default for ClientConfig {
186102
table_entry_refresh_try_interval: Duration::from_millis(20),
187103
table_entry_refresh_continuous_failure_ceiling: 10,
188104

189-
table_batch_op_thread_num: 16,
190-
191105
server_address_priority_timeout: Duration::from_secs(1800),
192106
runtime_continuous_failure_ceiling: 100,
193107

@@ -202,12 +116,11 @@ impl Default for ClientConfig {
202116

203117
max_conns_per_server: 10,
204118
min_idle_conns_per_server: 5,
205-
conn_init_thread_num: 2,
206119
query_concurrency_limit: None,
207120

208-
conn_reader_thread_num: 4,
209-
conn_writer_thread_num: 2,
210-
default_thread_num: 2,
121+
tcp_recv_thread_num: 4,
122+
tcp_send_thread_num: 2,
123+
bg_thread_num: 2,
211124

212125
max_inflight_reqs_per_conn: 100,
213126

0 commit comments

Comments
 (0)