diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index a00c0b20fb9eb..93f933c92928c 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::fmt::Display; +use std::iter::FromIterator; use std::sync::Arc; use common_datavalues::chrono::DateTime; @@ -72,6 +73,7 @@ use common_meta_app::schema::UpsertTableCopiedFileReply; use common_meta_app::schema::UpsertTableCopiedFileReq; use common_meta_app::schema::UpsertTableOptionReply; use common_meta_app::schema::UpsertTableOptionReq; +use common_meta_app::share::ShareGrantObject; use common_meta_app::share::ShareGrantObjectPrivilege; use common_meta_app::share::ShareId; use common_meta_app::share::ShareNameIdent; @@ -81,6 +83,7 @@ use common_meta_types::app_error::CreateTableWithDropTime; use common_meta_types::app_error::DatabaseAlreadyExists; use common_meta_types::app_error::DropDbWithDropTime; use common_meta_types::app_error::DropTableWithDropTime; +use common_meta_types::app_error::ShareHasNoGrantedDatabase; use common_meta_types::app_error::ShareHasNoGrantedPrivilege; use common_meta_types::app_error::TableAlreadyExists; use common_meta_types::app_error::TableVersionMismatched; @@ -94,6 +97,7 @@ use common_meta_types::app_error::UnknownShareAccounts; use common_meta_types::app_error::UnknownTable; use common_meta_types::app_error::UnknownTableId; use common_meta_types::app_error::WrongShare; +use common_meta_types::app_error::WrongShareObject; use common_meta_types::ConditionResult; use common_meta_types::GCDroppedDataReply; use common_meta_types::GCDroppedDataReq; @@ -878,6 +882,13 @@ impl SchemaApi for KV { let (_, db_id, db_meta_seq, db_meta) = get_db_or_err(self, &tenant_dbname, "create_table").await?; + // cannot operate on shared database + if let Some(from_share) = db_meta.from_share { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedPrivilege( + ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name), + ))); + } + // Get table by tenant,db_id, table_name to assert absence. let dbid_tbname = DBIdTableName { @@ -1024,6 +1035,13 @@ impl SchemaApi for KV { let (_, db_id, db_meta_seq, db_meta) = get_db_or_err(self, &tenant_dbname, "drop_table").await?; + // cannot operate on shared database + if let Some(from_share) = db_meta.from_share { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedPrivilege( + ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name), + ))); + } + // Get table by tenant,db_id, table_name to assert presence. let dbid_tbname = DBIdTableName { @@ -1150,6 +1168,13 @@ impl SchemaApi for KV { let (_, db_id, db_meta_seq, db_meta) = get_db_or_err(self, &tenant_dbname, "undrop_table").await?; + // cannot operate on shared database + if let Some(from_share) = db_meta.from_share { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedPrivilege( + ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name), + ))); + } + // Get table by tenant,db_id, table_name to assert presence. let dbid_tbname = DBIdTableName { @@ -1304,6 +1329,13 @@ impl SchemaApi for KV { let (_, db_id, db_meta_seq, db_meta) = get_db_or_err(self, &tenant_dbname, "rename_table").await?; + // cannot operate on shared database + if let Some(from_share) = db_meta.from_share { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedPrivilege( + ShareHasNoGrantedPrivilege::new(&from_share.tenant, &from_share.share_name), + ))); + } + // Get table by db_id, table_name to assert presence. let dbid_tbname = DBIdTableName { @@ -1491,24 +1523,44 @@ impl SchemaApi for KV { // Get db by name to ensure presence - let (db_id_seq, db_id) = get_u64_value(self, &tenant_dbname).await?; - debug!(db_id_seq, db_id, ?tenant_dbname_tbname, "get database"); - - db_has_to_exist( - db_id_seq, + let res = get_db_or_err( + self, &tenant_dbname, - format!("get_table: {}", tenant_dbname_tbname), - )?; + format!("get_table: {}", tenant_dbname), + ) + .await; + + let (_db_id_seq, db_id, _db_meta_seq, db_meta) = match res { + Ok(x) => x, + Err(e) => { + return Err(e); + } + }; - // Get table by tenant,db_id, table_name to assert presence. + let table_id = match db_meta.from_share { + Some(share) => { + get_table_id_from_share_by_name( + self, + share, + db_id, + &tenant_dbname_tbname.table_name, + ) + .await? + } + None => { + // Get table by tenant,db_id, table_name to assert presence. - let dbid_tbname = DBIdTableName { - db_id, - table_name: tenant_dbname_tbname.table_name.clone(), - }; + let dbid_tbname = DBIdTableName { + db_id, + table_name: tenant_dbname_tbname.table_name.clone(), + }; + + let (tb_id_seq, table_id) = get_u64_value(self, &dbid_tbname).await?; + table_has_to_exist(tb_id_seq, tenant_dbname_tbname, "get_table")?; - let (tb_id_seq, table_id) = get_u64_value(self, &dbid_tbname).await?; - table_has_to_exist(tb_id_seq, tenant_dbname_tbname, "get_table")?; + table_id + } + }; let tbid = TableId { table_id }; @@ -1517,7 +1569,7 @@ impl SchemaApi for KV { table_has_to_exist( tb_meta_seq, tenant_dbname_tbname, - format!("get_table meta by: {}", tbid), + format!("get_table meta by: {}", tenant_dbname_tbname), )?; debug!( @@ -1529,7 +1581,7 @@ impl SchemaApi for KV { let tb_info = TableInfo { ident: TableIdent { - table_id, + table_id: tbid.table_id, seq: tb_meta_seq, }, desc: tenant_dbname_tbname.to_string(), @@ -1639,64 +1691,24 @@ impl SchemaApi for KV { let tenant_dbname = &req.inner; // Get db by name to ensure presence - - let (db_id_seq, db_id) = get_u64_value(self, tenant_dbname).await?; - debug!( - db_id_seq, - db_id, - ?tenant_dbname, - "get database for listing table" - ); - - db_has_to_exist(db_id_seq, tenant_dbname, "list_tables")?; - - // List tables by tenant, db_id, table_name. - - let dbid_tbname = DBIdTableName { - db_id, - // Use empty name to scan all tables - table_name: "".to_string(), + let res = get_db_or_err( + self, + tenant_dbname, + format!("list_tables: {}", &tenant_dbname), + ) + .await; + + let (_db_id_seq, db_id, _db_meta_seq, db_meta) = match res { + Ok(x) => x, + Err(e) => { + return Err(e); + } }; - let (dbid_tbnames, ids) = list_u64_value(self, &dbid_tbname).await?; - - let mut tb_meta_keys = Vec::with_capacity(ids.len()); - for (i, _name_key) in dbid_tbnames.iter().enumerate() { - let tbid = TableId { table_id: ids[i] }; - - tb_meta_keys.push(tbid.to_key()); - } - - // mget() corresponding table_metas - - let seq_tb_metas = self.mget_kv(&tb_meta_keys).await?; - - let mut tb_infos = Vec::with_capacity(ids.len()); - - for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() { - if let Some(seq_meta) = seq_meta_opt { - let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?; - - let tb_info = TableInfo { - ident: TableIdent { - table_id: ids[i], - seq: seq_meta.seq, - }, - desc: format!( - "'{}'.'{}'", - tenant_dbname.db_name, dbid_tbnames[i].table_name - ), - meta: tb_meta, - name: dbid_tbnames[i].table_name.clone(), - }; - tb_infos.push(Arc::new(tb_info)); - } else { - debug!( - k = display(&tb_meta_keys[i]), - "db_meta not found, maybe just deleted after listing names and before listing meta" - ); - } - } + let tb_infos = match db_meta.from_share { + None => list_tables_from_unshare_db(self, db_id, tenant_dbname).await?, + Some(share) => list_tables_from_share_db(self, share, db_id, tenant_dbname).await?, + }; Ok(tb_infos) } @@ -2520,3 +2532,183 @@ async fn count_tables(kv_api: &impl KVApi, key: &CountTablesKey) -> Result Result { + let res = get_share_or_err( + kv_api, + &share, + format!("list_tables_from_share_db: {}", &share), + ) + .await; + + let (share_id_seq, _share_id, _share_meta_seq, share_meta) = match res { + Ok(x) => x, + Err(e) => { + return Err(e); + } + }; + if share_id_seq == 0 { + return Err(MetaError::AppError(AppError::WrongShare(WrongShare::new( + share.to_string(), + )))); + } + if !share_meta.share_from_db_ids.contains(&db_id) { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedDatabase( + ShareHasNoGrantedDatabase::new(&share.tenant, &share.share_name), + ))); + } + + let mut ids = Vec::with_capacity(share_meta.entries.len()); + for (_, entry) in share_meta.entries.iter() { + if let ShareGrantObject::Table(table_id) = entry.object { + ids.push(table_id); + } + } + + let table_names = get_table_names_by_ids(kv_api, &ids).await?; + match table_names.binary_search(table_name) { + Ok(i) => Ok(ids[i]), + Err(_) => Err(MetaError::AppError(AppError::WrongShareObject( + WrongShareObject::new(table_name.to_string()), + ))), + } +} + +async fn get_table_names_by_ids( + kv_api: &impl KVApi, + ids: &[u64], +) -> Result, MetaError> { + let mut table_names = vec![]; + + for id in ids.iter() { + let key = TableIdToName { table_id: *id }; + let (_table_id_to_name_seq, table_name_opt): (_, Option) = + get_struct_value(kv_api, &key).await?; + + match table_name_opt { + Some(table_name) => table_names.push(table_name.table_name), + None => { + return Err(MetaError::AppError(AppError::UnknownTableId( + UnknownTableId::new(*id, "get_table_names_by_ids"), + ))); + } + } + } + Ok(table_names) +} + +async fn get_tableinfos_by_ids( + kv_api: &impl KVApi, + ids: &[u64], + tenant_dbname: &DatabaseNameIdent, + dbid_tbnames_opt: Option>, +) -> Result>, MetaError> { + let mut tb_meta_keys = Vec::with_capacity(ids.len()); + for id in ids.iter() { + let tbid = TableId { table_id: *id }; + + tb_meta_keys.push(tbid.to_key()); + } + + // mget() corresponding table_metas + + let seq_tb_metas = kv_api.mget_kv(&tb_meta_keys).await?; + + let mut tb_infos = Vec::with_capacity(ids.len()); + + let tbnames = match dbid_tbnames_opt { + Some(dbid_tbnames) => Vec::::from_iter( + dbid_tbnames + .into_iter() + .map(|dbid_tbname| dbid_tbname.table_name), + ), + + None => get_table_names_by_ids(kv_api, ids).await?, + }; + + for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() { + if let Some(seq_meta) = seq_meta_opt { + let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?; + + let tb_info = TableInfo { + ident: TableIdent { + table_id: ids[i], + seq: seq_meta.seq, + }, + desc: format!("'{}'.'{}'", tenant_dbname.db_name, tbnames[i]), + meta: tb_meta, + name: tbnames[i].clone(), + }; + tb_infos.push(Arc::new(tb_info)); + } else { + debug!( + k = display(&tb_meta_keys[i]), + "db_meta not found, maybe just deleted after listing names and before listing meta" + ); + } + } + + Ok(tb_infos) +} + +async fn list_tables_from_unshare_db( + kv_api: &impl KVApi, + db_id: u64, + tenant_dbname: &DatabaseNameIdent, +) -> Result>, MetaError> { + // List tables by tenant, db_id, table_name. + + let dbid_tbname = DBIdTableName { + db_id, + // Use empty name to scan all tables + table_name: "".to_string(), + }; + + let (dbid_tbnames, ids) = list_u64_value(kv_api, &dbid_tbname).await?; + + get_tableinfos_by_ids(kv_api, &ids, tenant_dbname, Some(dbid_tbnames)).await +} + +async fn list_tables_from_share_db( + kv_api: &impl KVApi, + share: ShareNameIdent, + db_id: u64, + tenant_dbname: &DatabaseNameIdent, +) -> Result>, MetaError> { + let res = get_share_or_err( + kv_api, + &share, + format!("list_tables_from_share_db: {}", &share), + ) + .await; + + let (share_id_seq, _share_id, _share_meta_seq, share_meta) = match res { + Ok(x) => x, + Err(e) => { + return Err(e); + } + }; + if share_id_seq == 0 { + return Err(MetaError::AppError(AppError::WrongShare(WrongShare::new( + share.to_string(), + )))); + } + if !share_meta.share_from_db_ids.contains(&db_id) { + return Err(MetaError::AppError(AppError::ShareHasNoGrantedDatabase( + ShareHasNoGrantedDatabase::new(&share.tenant, &share.share_name), + ))); + } + + let mut ids = Vec::with_capacity(share_meta.entries.len()); + for (_, entry) in share_meta.entries.iter() { + if let ShareGrantObject::Table(table_id) = entry.object { + ids.push(table_id); + } + } + get_tableinfos_by_ids(kv_api, &ids, tenant_dbname, None).await +} diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 368db579e020e..ad5530a7199f4 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -253,6 +253,7 @@ impl SchemaApiTestSuite { suite.get_table_by_id(&b.build().await).await?; suite.get_table_copied_file(&b.build().await).await?; suite.truncate_table(&b.build().await).await?; + suite.get_tables_from_share(&b.build().await).await?; Ok(()) } @@ -3376,6 +3377,165 @@ impl SchemaApiTestSuite { Ok(()) } + async fn get_tables_from_share( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant1 = "tenant1"; + let tenant2 = "tenant2"; + let db1 = "db1"; + let db2 = "db2"; + let share = "share"; + let tb1 = "tb1"; + let tb2 = "tb2"; + let share_name = ShareNameIdent { + tenant: tenant1.to_string(), + share_name: share.to_string(), + }; + let db_name1 = DatabaseNameIdent { + tenant: tenant1.to_string(), + db_name: db1.to_string(), + }; + let db_name2 = DatabaseNameIdent { + tenant: tenant2.to_string(), + db_name: db2.to_string(), + }; + let tb_name1 = TableNameIdent { + tenant: tenant1.to_string(), + db_name: db1.to_string(), + table_name: tb1.to_string(), + }; + let tb_name2 = TableNameIdent { + tenant: tenant1.to_string(), + db_name: db1.to_string(), + table_name: tb2.to_string(), + }; + let tb_names = vec![&tb_name1, &tb_name2]; + let mut share_table_id = 0; + + info!("--- create a share and grant access to db and table"); + { + let create_on = Utc::now(); + let share_on = Utc::now(); + let req = CreateShareReq { + if_not_exists: false, + share_name: share_name.clone(), + comment: None, + create_on, + }; + + let _ = mt.create_share(req).await?; + + // create share db + let req = CreateDatabaseReq { + if_not_exists: false, + name_ident: db_name1.clone(), + meta: DatabaseMeta { + ..Default::default() + }, + }; + let _ = mt.create_database(req).await?; + + // create share table + let schema = || { + Arc::new(DataSchema::new(vec![DataField::new( + "number", + u64::to_data_type(), + )])) + }; + let table_meta = |created_on| TableMeta { + schema: schema(), + engine: "JSON".to_string(), + options: BTreeMap::new(), + updated_on: created_on, + created_on, + ..TableMeta::default() + }; + for tb_name in tb_names { + let req = CreateTableReq { + if_not_exists: false, + name_ident: tb_name.clone(), + table_meta: table_meta(create_on), + }; + let res = mt.create_table(req).await?; + if tb_name == &tb_name1 { + share_table_id = res.table_id; + } + } + + // grant the tenant2 to access the share + let req = AddShareAccountsReq { + share_name: share_name.clone(), + share_on, + if_exists: false, + accounts: vec![tenant2.to_string()], + }; + + let res = mt.add_share_tenants(req).await; + assert!(res.is_ok()); + + // grant access to database and table1 + let req = GrantShareObjectReq { + share_name: share_name.clone(), + object: ShareGrantObjectName::Database(db1.to_string()), + grant_on: create_on, + privilege: ShareGrantObjectPrivilege::Usage, + }; + let _ = mt.grant_share_object(req).await?; + + let req = GrantShareObjectReq { + share_name: share_name.clone(), + object: ShareGrantObjectName::Table(db1.to_string(), tb1.to_string()), + grant_on: create_on, + privilege: ShareGrantObjectPrivilege::Select, + }; + let _ = mt.grant_share_object(req).await?; + } + + info!("--- create a share db"); + { + let req = CreateDatabaseReq { + if_not_exists: false, + name_ident: db_name2.clone(), + meta: DatabaseMeta { + from_share: Some(share_name.clone()), + ..Default::default() + }, + }; + let _ = mt.create_database(req).await?; + }; + + info!("--- list tables from share db"); + { + let res = mt.list_tables(ListTableReq::new(tenant2, db2)).await; + assert!(res.is_ok()); + let res = res.unwrap(); + assert_eq!(res.len(), 1); + // since share has not grant access to tb2, so list tables only return tb1. + let table_info = &res[0]; + assert_eq!(table_info.name, tb1.to_string()); + assert_eq!(table_info.ident.table_id, share_table_id); + } + + info!("--- get tables from share db"); + { + let got = mt.get_table((tenant2, db2, tb1).into()).await; + assert!(got.is_ok()); + let got = got.unwrap(); + assert_eq!(got.ident.table_id, share_table_id); + assert_eq!(got.name, tb1.to_string()); + + let got = mt.get_table((tenant2, db2, tb2).into()).await; + assert!(got.is_err()); + assert_eq!( + ErrorCode::from(got.unwrap_err()).code(), + ErrorCode::WrongShareObject("").code() + ); + } + + Ok(()) + } + #[tracing::instrument(level = "debug", skip_all)] async fn table_list(&self, mt: &MT) -> anyhow::Result<()> { let tenant = "tenant1";