Skip to content

[Feat] Async Interface #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions benches/concurrent_insert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

extern crate obkv;

use std::{sync::Arc, thread, time};
use std::{sync::Arc, time};

use obkv::{serde_obkv::value::Value, Builder, ObTableClient, RunningMode, Table};
use obkv::{serde_obkv::value::Value, Builder, ObTableClient, RunningMode};
use tokio::task;

// TODO: use test conf to control which environments to test.
const TEST_FULL_USER_NAME: &str = "test";
Expand Down Expand Up @@ -54,11 +55,11 @@ const TABLE_NAME: &str = "series_key_to_id_0";
// PRIMARY KEY(series_key),
// KEY index_id(series_id)
// );
fn concurrent_insert(client: Arc<ObTableClient>) {
async fn concurrent_insert(client: Arc<ObTableClient>) {
let mut thds = Vec::with_capacity(20);
for i in 0..50 {
let client = client.clone();
let thd = thread::spawn(move || {
let thd = task::spawn(async move {
for j in i * 100..(i * 100 + 50) {
let series_key = format!("series_key_test_padding_padding_{j}");
let series_id = j * j;
Expand All @@ -69,6 +70,7 @@ fn concurrent_insert(client: Arc<ObTableClient>) {
vec!["series_id".to_owned()],
vec![Value::from(series_id as i64)],
)
.await
.unwrap_or_else(|err| {
panic!("fail to insert row:{series_key} {series_id}, err:{err}")
});
Expand All @@ -78,18 +80,20 @@ fn concurrent_insert(client: Arc<ObTableClient>) {
}

for (i, thd) in thds.into_iter().enumerate() {
thd.join()
thd.await
.unwrap_or_else(|_| panic!("thread#{i} fail to join"));
}
}

fn main() {
let client = build_client(RunningMode::Normal);
#[tokio::main]
async fn main() {
let client_handle = task::spawn_blocking(|| build_client(RunningMode::Normal));
let client = client_handle.await.unwrap();
client
.truncate_table(TABLE_NAME)
.expect("fail to truncate the table");
let start = time::Instant::now();
concurrent_insert(Arc::new(client));
concurrent_insert(Arc::new(client)).await;
let elapsed = time::Instant::now() - start;
println!("Benches::concurrent_insert cost time:{elapsed:?}");
}
47 changes: 20 additions & 27 deletions docs/simple_demo/simple_operation/demo.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Demo for obkv-table-client-rs
Edited by OBKV developers on March 3, 2023.
Edited by OBKV developers on June 6, 2023.

## Introduction
obkv-table-client-rs is Rust Library that can access table data from OceanBase storage layer.
Expand All @@ -13,42 +13,40 @@ Now we provide an interface to access data from OceanBase, which we will introdu
obkv-table-client-rs support several simple operations, such as get, insert, update, insert_or_update, replace, append, increment, delete.

```rust Table and ObTableClient
impl Table for ObTableClient {
// implement operation in Table
// ...
}

pub trait Table {
fn insert(
impl ObTableClient {
// implement operation
#[inline]
pub async fn insert(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;
) -> Result<i64> {}

fn update(
#[inline]
pub async fn update(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;
) -> Result<i64> {}
// ...
}
```

A simple operation example is shown below:
```rust simple operation example
fn simple_operation() {
async fn simple_operation() {
let client = build_normal_client();

let result = client.insert(
"your_table_name",
vec![Value::from("foo")],
vec!["c2".to_owned()],
vec![Value::from("baz")],
);
).await;

assert!(result.is_ok());
}
Expand All @@ -72,7 +70,7 @@ impl ObTableBatchOperation {
```
A simple batch operation example is shown below:
```rust batch operation example
fn batch_operation() {
async fn batch_operation() {
let client = utils::common::build_normal_client();

// set number of operations in batch_op
Expand All @@ -87,7 +85,7 @@ fn batch_operation() {
);

// execute
let result = client.execute_batch("your_table_name", batch_op);
let result = client.execute_batch("your_table_name", batch_op).await;
assert!(result.is_ok());
}
```
Expand All @@ -97,30 +95,25 @@ More [demos](https://github.com/oceanbase/obkv-table-client-rs/blob/main/tests/t
Query is different from get, it allows the user to get a range of data.
A **Query** could get from **ObTableClient** by calling ```query()``` method, then you could customize your query by calling methods in **ObTableClientQueryImpl** and **TableQuery**.
```rust ObTableClientQueryImpll
impl TableQuery for ObTableClientQueryImpl {
// implement methods from TableQuery
// ...
}

pub trait TableQuery {
fn execute(&self) -> Result<QueryResultSet>;
fn select(self, columns: Vec<String>) -> Self;
impl ObTableClientQueryImpl {
pub async fn execute(&self) -> Result<QueryResultSet> {}
pub fn select(self, columns: Vec<String>) -> Self {}
// ...
fn clear(&mut self);
pub fn clear(&mut self) {}
}
```
A simple query example is shown below:
```rust query example
fn query() {
async fn query() {
let client = utils::common::build_normal_client();

let query = client
.query("your_table_name")
.select(vec!["c1".to_owned()])
.scan_order(false)
.add_scan_range(vec![Value::from("123")], true, vec![Value::from("567")], true)
.add_scan_range(vec![Value::from("123")], true, vec![Value::from("567")], true);

let result = query.execute();
let result = query.execute().await;
assert!(result.is_ok());
}
```
Expand Down
101 changes: 7 additions & 94 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

use std::{collections::HashMap, time::Duration};

use crate::{
error::Result,
rpc::protocol::{payloads::ObTableBatchOperation, DEFAULT_FLAG},
serde_obkv::value::Value,
};
use crate::{rpc::protocol::DEFAULT_FLAG, serde_obkv::value::Value};

mod ocp;
pub mod query;
Expand All @@ -35,83 +31,6 @@ pub enum TableOpResult {
RetrieveRows(HashMap<String, Value>),
}

pub trait Table {
/// Insert a record
fn insert(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Update a record
fn update(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Insert or update a record, if the record exists, update it.
/// Otherwise insert a new one.
fn insert_or_update(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Replace a record.
fn replace(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Append
fn append(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Increment
fn increment(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
) -> Result<i64>;

/// Delete records by row keys.
fn delete(&self, table_name: &str, row_keys: Vec<Value>) -> Result<i64>;

/// Retrieve a record by row keys.
fn get(
&self,
table_name: &str,
row_keys: Vec<Value>,
columns: Vec<String>,
) -> Result<HashMap<String, Value>>;

/// Create a batch operation
fn batch_operation(&self, ops_num_hint: usize) -> ObTableBatchOperation;
// Execute a batch operation
fn execute_batch(
&self,
table_name: &str,
batch_op: ObTableBatchOperation,
) -> Result<Vec<TableOpResult>>;
}

/// ObTable client config
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ClientConfig {
Expand All @@ -136,8 +55,6 @@ pub struct ClientConfig {
pub table_entry_refresh_try_interval: Duration,
pub table_entry_refresh_continuous_failure_ceiling: usize,

pub table_batch_op_thread_num: usize,

pub server_address_priority_timeout: Duration,
pub runtime_continuous_failure_ceiling: usize,

Expand All @@ -152,12 +69,11 @@ pub struct ClientConfig {

pub max_conns_per_server: usize,
pub min_idle_conns_per_server: usize,
pub conn_init_thread_num: usize,
pub query_concurrency_limit: Option<usize>,

pub conn_reader_thread_num: usize,
pub conn_writer_thread_num: usize,
pub default_thread_num: usize,
pub tcp_recv_thread_num: usize,
pub tcp_send_thread_num: usize,
pub bg_thread_num: usize,

pub max_inflight_reqs_per_conn: usize,

Expand Down Expand Up @@ -186,8 +102,6 @@ impl Default for ClientConfig {
table_entry_refresh_try_interval: Duration::from_millis(20),
table_entry_refresh_continuous_failure_ceiling: 10,

table_batch_op_thread_num: 16,

server_address_priority_timeout: Duration::from_secs(1800),
runtime_continuous_failure_ceiling: 100,

Expand All @@ -202,12 +116,11 @@ impl Default for ClientConfig {

max_conns_per_server: 10,
min_idle_conns_per_server: 5,
conn_init_thread_num: 2,
query_concurrency_limit: None,

conn_reader_thread_num: 4,
conn_writer_thread_num: 2,
default_thread_num: 2,
tcp_recv_thread_num: 4,
tcp_send_thread_num: 2,
bg_thread_num: 2,

max_inflight_reqs_per_conn: 100,

Expand Down
Loading