From 47ecbeadd2a4694b8c1a3c38f17e5d2719da94c0 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Thu, 11 May 2023 15:50:16 +0800 Subject: [PATCH 1/3] [Feat] add scan ycsb --- ycsb-rs/src/db.rs | 1 + ycsb-rs/src/obkv_client.rs | 19 ++++++++++++++++++- ycsb-rs/src/sqlite.rs | 5 +++++ ycsb-rs/src/workload/core_workload.rs | 11 +++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/ycsb-rs/src/db.rs b/ycsb-rs/src/db.rs index dfc2870..06288db 100644 --- a/ycsb-rs/src/db.rs +++ b/ycsb-rs/src/db.rs @@ -12,6 +12,7 @@ pub trait DB { fn insert(&self, table: &str, key: &str, values: &HashMap<&str, String>) -> Result<()>; fn read(&self, table: &str, key: &str, result: &mut HashMap) -> Result<()>; fn update(&self, table: &str, key: &str, values: &HashMap<&str, String>) -> Result<()>; + fn scan(&self, table: &str, startkey: &str, endkey: &str, values: &mut HashMap) -> Result<()>; } pub fn create_db(db: &str, config: Arc) -> Result> { diff --git a/ycsb-rs/src/obkv_client.rs b/ycsb-rs/src/obkv_client.rs index 2a86e1a..a2038f7 100644 --- a/ycsb-rs/src/obkv_client.rs +++ b/ycsb-rs/src/obkv_client.rs @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::Result; #[allow(unused)] use obkv::error::CommonErrCode; -use obkv::{Builder, ClientConfig, ObTableClient, RunningMode, Table, Value}; +use obkv::{Builder, ClientConfig, ObTableClient, RunningMode, Table, Value, TableQuery}; use crate::{db::DB, properties::Properties}; @@ -180,4 +180,21 @@ impl DB for OBKVClient { Ok(()) } + + #[allow(unused)] + fn scan(&self, table: &str, startkey: &str, endkey: &str, result: &mut HashMap) -> Result<()> { + let result = self + .client + .query(table) + .select(COLUMN_NAMES.iter().map(|s| s.to_string()).collect(),) + .primary_index() + .add_scan_range( + vec![Value::from(startkey)], + true, + vec![Value::from(endkey)], + true, + ).execute(); + assert!(result.is_ok()); + Ok(()) + } } diff --git a/ycsb-rs/src/sqlite.rs b/ycsb-rs/src/sqlite.rs index b53ae33..1cc1fb0 100644 --- a/ycsb-rs/src/sqlite.rs +++ b/ycsb-rs/src/sqlite.rs @@ -76,4 +76,9 @@ impl DB for SQLite { fn update(&self, table: &str, key: &str, values: &HashMap<&str, String>) -> Result<()> { todo!() } + + #[allow(unused_variables)] + fn scan(&self, table: &str, startkey: &str, endkey: &str, values: &mut HashMap) -> Result<()> { + todo!() + } } diff --git a/ycsb-rs/src/workload/core_workload.rs b/ycsb-rs/src/workload/core_workload.rs index 625491a..e578983 100644 --- a/ycsb-rs/src/workload/core_workload.rs +++ b/ycsb-rs/src/workload/core_workload.rs @@ -177,6 +177,14 @@ impl CoreWorkload { db.update(&self.table, &dbkey, &values).unwrap(); } + fn ob_transaction_scan(&self, db: Arc) { + let start = self.next_key_num(); + let dbstart = format!("{}", fnvhash64(start)); + let dbend = format!("{}", fnvhash64(start)); + let mut result = HashMap::new(); + db.scan(&self.table, &dbstart, &dbend, &mut result).unwrap(); + } + fn next_key_num(&self) -> u64 { // FIXME: Handle case where keychooser is an ExponentialGenerator. // FIXME: Handle case where keynum is > transactioninsertkeysequence's last @@ -267,6 +275,9 @@ impl Workload for CoreWorkload { CoreOperation::Update => { self.ob_transaction_update(db); } + CoreOperation::Scan => { + self.ob_transaction_scan(db); + }, _ => todo!(), } } From 97356fd70debf4a8e9522808bdf5ca5446531292 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Thu, 11 May 2023 16:34:45 +0800 Subject: [PATCH 2/3] [Fix] primary key prefix in key partition --- src/location/ob_part_desc.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/location/ob_part_desc.rs b/src/location/ob_part_desc.rs index 16b9ee5..b48181a 100644 --- a/src/location/ob_part_desc.rs +++ b/src/location/ob_part_desc.rs @@ -643,9 +643,14 @@ impl ObKeyPartDesc { is_min_max &= v.is_max(); } + // For partition key, the first part of the row key is the partition key + let part_ref_column_size = self + .ob_part_desc_obj + .ordered_part_ref_column_row_key_relations + .len(); // Note: Java / ODP may not query all the partitions, and will return an error // instead - if is_min_max || !self.is_equal_keys(start, end) { + if is_min_max || !self.is_equal_keys(&start[0..part_ref_column_size], &end[0..part_ref_column_size]) { let mut part_ids: Vec = Vec::with_capacity(self.part_num as usize); for i in 0..self.part_num as i64 { part_ids.push(i); From 9bcf8da5812f81b0f9a9641545bf284eafd9dec1 Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Thu, 11 May 2023 16:42:07 +0800 Subject: [PATCH 3/3] [Fix] cargo fmt --- src/location/ob_part_desc.rs | 7 ++++++- ycsb-rs/src/db.rs | 8 +++++++- ycsb-rs/src/obkv_client.rs | 15 +++++++++++---- ycsb-rs/src/sqlite.rs | 8 +++++++- ycsb-rs/src/workload/core_workload.rs | 2 +- 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/location/ob_part_desc.rs b/src/location/ob_part_desc.rs index b48181a..ef7a078 100644 --- a/src/location/ob_part_desc.rs +++ b/src/location/ob_part_desc.rs @@ -650,7 +650,12 @@ impl ObKeyPartDesc { .len(); // Note: Java / ODP may not query all the partitions, and will return an error // instead - if is_min_max || !self.is_equal_keys(&start[0..part_ref_column_size], &end[0..part_ref_column_size]) { + if is_min_max + || !self.is_equal_keys( + &start[0..part_ref_column_size], + &end[0..part_ref_column_size], + ) + { let mut part_ids: Vec = Vec::with_capacity(self.part_num as usize); for i in 0..self.part_num as i64 { part_ids.push(i); diff --git a/ycsb-rs/src/db.rs b/ycsb-rs/src/db.rs index 06288db..6da4590 100644 --- a/ycsb-rs/src/db.rs +++ b/ycsb-rs/src/db.rs @@ -12,7 +12,13 @@ pub trait DB { fn insert(&self, table: &str, key: &str, values: &HashMap<&str, String>) -> Result<()>; fn read(&self, table: &str, key: &str, result: &mut HashMap) -> Result<()>; fn update(&self, table: &str, key: &str, values: &HashMap<&str, String>) -> Result<()>; - fn scan(&self, table: &str, startkey: &str, endkey: &str, values: &mut HashMap) -> Result<()>; + fn scan( + &self, + table: &str, + startkey: &str, + endkey: &str, + values: &mut HashMap, + ) -> Result<()>; } pub fn create_db(db: &str, config: Arc) -> Result> { diff --git a/ycsb-rs/src/obkv_client.rs b/ycsb-rs/src/obkv_client.rs index a2038f7..bd59d59 100644 --- a/ycsb-rs/src/obkv_client.rs +++ b/ycsb-rs/src/obkv_client.rs @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::Result; #[allow(unused)] use obkv::error::CommonErrCode; -use obkv::{Builder, ClientConfig, ObTableClient, RunningMode, Table, Value, TableQuery}; +use obkv::{Builder, ClientConfig, ObTableClient, RunningMode, Table, TableQuery, Value}; use crate::{db::DB, properties::Properties}; @@ -182,18 +182,25 @@ impl DB for OBKVClient { } #[allow(unused)] - fn scan(&self, table: &str, startkey: &str, endkey: &str, result: &mut HashMap) -> Result<()> { + fn scan( + &self, + table: &str, + startkey: &str, + endkey: &str, + result: &mut HashMap, + ) -> Result<()> { let result = self .client .query(table) - .select(COLUMN_NAMES.iter().map(|s| s.to_string()).collect(),) + .select(COLUMN_NAMES.iter().map(|s| s.to_string()).collect()) .primary_index() .add_scan_range( vec![Value::from(startkey)], true, vec![Value::from(endkey)], true, - ).execute(); + ) + .execute(); assert!(result.is_ok()); Ok(()) } diff --git a/ycsb-rs/src/sqlite.rs b/ycsb-rs/src/sqlite.rs index 1cc1fb0..f9d2b33 100644 --- a/ycsb-rs/src/sqlite.rs +++ b/ycsb-rs/src/sqlite.rs @@ -78,7 +78,13 @@ impl DB for SQLite { } #[allow(unused_variables)] - fn scan(&self, table: &str, startkey: &str, endkey: &str, values: &mut HashMap) -> Result<()> { + fn scan( + &self, + table: &str, + startkey: &str, + endkey: &str, + values: &mut HashMap, + ) -> Result<()> { todo!() } } diff --git a/ycsb-rs/src/workload/core_workload.rs b/ycsb-rs/src/workload/core_workload.rs index e578983..f08a36a 100644 --- a/ycsb-rs/src/workload/core_workload.rs +++ b/ycsb-rs/src/workload/core_workload.rs @@ -277,7 +277,7 @@ impl Workload for CoreWorkload { } CoreOperation::Scan => { self.ob_transaction_scan(db); - }, + } _ => todo!(), } }