From fda25316bbd51161d16c47596608ade51bd15652 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 14:43:39 +0800 Subject: [PATCH 1/6] refactor: unify table commit path --- .../storages/fuse-meta/src/meta/versions.rs | 2 +- src/query/storages/fuse/src/fuse_table.rs | 133 ++++++++++-------- .../storages/fuse/src/operations/commit.rs | 77 +++++++--- .../storages/fuse/src/operations/delete.rs | 10 +- .../src/operations/mutation/base_mutator.rs | 2 - .../operations/mutation/compact_mutator.rs | 7 +- .../operations/mutation/recluster_mutator.rs | 10 +- 7 files changed, 137 insertions(+), 104 deletions(-) diff --git a/src/query/storages/fuse-meta/src/meta/versions.rs b/src/query/storages/fuse-meta/src/meta/versions.rs index e8413ce3bae2b..6c321b4ce7d63 100644 --- a/src/query/storages/fuse-meta/src/meta/versions.rs +++ b/src/query/storages/fuse-meta/src/meta/versions.rs @@ -30,7 +30,7 @@ use crate::meta::Versioned; // impl Versioned<0> for v0::SegmentInfo {} // impl Versioned<1> for v0::SegmentInfo {} // -// Fortunately, since v0::SegmentInfo::VESION is used in +// Fortunately, since v0::SegmentInfo::VERSION is used in // several places, compiler will report compile error if it // can not deduce a unique value the constant expression. diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 4182a2434f04f..039b637afec56 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -16,23 +16,18 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use common_cache::Cache; use common_catalog::catalog::StorageDescription; use common_catalog::table_context::TableContext; use common_catalog::table_mutator::TableMutator; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; -use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::ClusterKey; use common_fuse_meta::meta::Statistics as FuseStatistics; use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; use common_legacy_parser::ExpressionParser; use common_meta_app::schema::TableInfo; -use common_meta_app::schema::TableMeta; -use common_meta_app::schema::UpdateTableMetaReq; -use common_meta_types::MatchSeq; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; @@ -43,7 +38,6 @@ use common_planners::TruncateTablePlan; use common_storages_util::storage_context::StorageContext; use uuid::Uuid; -use crate::io::write_meta; use crate::io::BlockCompactor; use crate::io::MetaReaders; use crate::io::TableMetaLocationGenerator; @@ -174,52 +168,52 @@ impl FuseTable { } } - pub async fn update_table_meta( - &self, - ctx: &dyn TableContext, - catalog_name: &str, - snapshot: &TableSnapshot, - meta: &mut TableMeta, - ) -> Result<()> { - let uuid = snapshot.snapshot_id; - let snapshot_loc = self - .meta_location_generator() - .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let operator = ctx.get_storage_operator()?; - write_meta(&operator, &snapshot_loc, snapshot).await?; - - // set new snapshot location - meta.options - .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone()); - // remove legacy options - meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); - - let table_id = self.table_info.ident.table_id; - let table_version = self.table_info.ident.seq; - let req = UpdateTableMetaReq { - table_id, - seq: MatchSeq::Exact(table_version), - new_table_meta: meta.clone(), - }; - - let catalog = ctx.get_catalog(catalog_name)?; - let result = catalog.update_table_meta(req).await; - match result { - Ok(_) => { - if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { - let cache = &mut snapshot_cache.write().await; - cache.put(snapshot_loc, Arc::new(snapshot.clone())); - } - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed, try to delete it. - // "major GC" will collect this, if deletion failure (even after DAL retried) - let _ = operator.object(&snapshot_loc).delete().await; - Err(e) - } - } - } + // pub async fn update_table_meta( + // &self, + // ctx: &dyn TableContext, + // catalog_name: &str, + // snapshot: &TableSnapshot, + // meta: &mut TableMeta, + // ) -> Result<()> { + // let uuid = snapshot.snapshot_id; + // let snapshot_loc = self + // .meta_location_generator() + // .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; + // let operator = ctx.get_storage_operator()?; + // write_meta(&operator, &snapshot_loc, snapshot).await?; + // + // // set new snapshot location + // meta.options + // .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone()); + // // remove legacy options + // meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); + // + // let table_id = self.table_info.ident.table_id; + // let table_version = self.table_info.ident.seq; + // let req = UpdateTableMetaReq { + // table_id, + // seq: MatchSeq::Exact(table_version), + // new_table_meta: meta.clone(), + // }; + // + // let catalog = ctx.get_catalog(catalog_name)?; + // let result = catalog.update_table_meta(req).await; + // match result { + // Ok(_) => { + // if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { + // let cache = &mut snapshot_cache.write().await; + // cache.put(snapshot_loc, Arc::new(snapshot.clone())); + // } + // Ok(()) + // } + // Err(e) => { + // // commit snapshot to meta server failed, try to delete it. + // // "major GC" will collect this, if deletion failure (even after DAL retried) + // let _ = operator.object(&snapshot_loc).delete().await; + // Err(e) + // } + // } + // } pub fn transient(&self) -> bool { self.table_info.meta.options.contains_key("TRANSIENT") @@ -287,8 +281,9 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; + let new_snapshot_id = Uuid::new_v4(); let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), + new_snapshot_id, &prev_timestamp, prev_snapshot_id, schema, @@ -297,13 +292,21 @@ impl Table for FuseTable { cluster_key_meta, ); - self.update_table_meta( + // write down the new snapshot + let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( + &new_snapshot.snapshot_id, + new_snapshot.format_version(), + )?; + + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &self.table_info, + snapshot_loc, + new_snapshot, ) - .await + .await?; + Ok(()) } async fn drop_table_cluster_keys( @@ -331,8 +334,9 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; + let new_snapshot_id = Uuid::new_v4(); let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), + new_snapshot_id, &prev_timestamp, prev_snapshot_id, schema, @@ -341,11 +345,18 @@ impl Table for FuseTable { None, ); - self.update_table_meta( + // write down the new snapshot + let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( + &new_snapshot.snapshot_id, + new_snapshot.format_version(), + )?; + + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &self.table_info, + snapshot_loc, + new_snapshot, ) .await } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 2ba7c1e1e2fa1..12a78a7796a2d 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -19,11 +19,13 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; use common_base::base::ProgressValues; +use common_cache::Cache; use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; +use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::ClusterKey; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SegmentInfo; @@ -32,7 +34,6 @@ use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableStatistics; -use common_meta_app::schema::UpdateTableMetaReply; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; use tracing::debug; @@ -40,6 +41,7 @@ use tracing::info; use tracing::warn; use uuid::Uuid; +use crate::io::write_meta; use crate::operations::AppendOperationLogEntry; use crate::operations::TableOperationLog; use crate::statistics; @@ -177,9 +179,11 @@ impl FuseTable { .into_iter() .map(|loc| (loc, SegmentInfo::VERSION)) .collect(); + + let new_snapshot_id = Uuid::new_v4(); let new_snapshot = if overwrite { TableSnapshot::new( - Uuid::new_v4(), + new_snapshot_id, &prev_timestamp, prev.as_ref().map(|v| (v.snapshot_id, prev_version)), schema, @@ -207,11 +211,18 @@ impl FuseTable { index_data_bytes: new_snapshot.summary.index_size, }; - self.update_table_meta( + // write down the new snapshot + let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( + &new_snapshot.snapshot_id, + new_snapshot.format_version(), + )?; + + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &self.table_info, + snapshot_loc, + new_snapshot, ) .await } @@ -256,24 +267,25 @@ impl FuseTable { ctx: &dyn TableContext, catalog_name: &str, table_info: &TableInfo, - new_snapshot_location: String, - stats: &Statistics, - ) -> Result { - let catalog = ctx.get_catalog(catalog_name)?; - - let table_id = table_info.ident.table_id; - let table_version = table_info.ident.seq; + snapshot_location: String, + snapshot: TableSnapshot, + ) -> Result<()> { + // 1. write down snapshot + let operator = ctx.get_storage_operator()?; + write_meta(&operator, &snapshot_location, &snapshot).await?; + // 2. prepare table meta let mut new_table_meta = table_info.meta.clone(); - - // set new snapshot location - new_table_meta - .options - .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), new_snapshot_location); - + // 2.1 set new snapshot location + new_table_meta.options.insert( + OPT_KEY_SNAPSHOT_LOCATION.to_owned(), + snapshot_location.clone(), + ); // remove legacy options self::utils::remove_legacy_options(&mut new_table_meta.options); + // 2.2 setup table statistics + let stats = &snapshot.summary; // update statistics new_table_meta.statistics = TableStatistics { number_of_rows: stats.row_count, @@ -282,13 +294,35 @@ impl FuseTable { index_data_bytes: stats.index_size, }; + // 3. prepare the request + + let catalog = ctx.get_catalog(catalog_name)?; + let table_id = table_info.ident.table_id; + let table_version = table_info.ident.seq; + let req = UpdateTableMetaReq { table_id, seq: MatchSeq::Exact(table_version), new_table_meta, }; - catalog.update_table_meta(req).await + // 3. let's roll + let reply = catalog.update_table_meta(req).await; + match reply { + Ok(_) => { + if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { + let cache = &mut snapshot_cache.write().await; + cache.put(snapshot_location, Arc::new(snapshot)); + } + Ok(()) + } + Err(e) => { + // commit snapshot to meta server failed, try to delete it. + // "major GC" will collect this, if deletion failure (even after DAL retried) + let _ = operator.object(&snapshot_location).delete().await; + Err(e) + } + } } pub fn merge_append_operations( @@ -333,6 +367,11 @@ impl FuseTable { }; catalog.get_table_by_info(&table_info) } + + // Left a hint file which indicates the location of the latest snapshot + // async fn write_last_snapshot_hint(_ctx: &dyn TableContext, _last_snapshot_path: String) { + // todo!() + //} } mod utils { diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 7b917ba4d20cc..84697e33930a0 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -126,14 +126,8 @@ impl FuseTable { catalog_name: &str, ) -> Result<()> { let (new_snapshot, loc) = del_holder.into_new_snapshot().await?; - Self::commit_to_meta_server( - ctx, - catalog_name, - self.get_table_info(), - loc, - &new_snapshot.summary, - ) - .await?; + Self::commit_to_meta_server(ctx, catalog_name, self.get_table_info(), loc, new_snapshot) + .await?; // TODO check if error is recoverable, and try to resolve the conflict Ok(()) } diff --git a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs index 7169d0badeb26..c0a9dca9c7df1 100644 --- a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs @@ -26,7 +26,6 @@ use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; use opendal::Operator; -use crate::io::write_meta; use crate::io::MetaReaders; use crate::io::SegmentWriter; use crate::io::TableMetaLocationGenerator; @@ -96,7 +95,6 @@ impl BaseMutator { &new_snapshot.snapshot_id, new_snapshot.format_version(), )?; - write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?; Ok((new_snapshot, snapshot_loc)) } diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs index c506b5fd1a6ab..315b76f18a38f 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs @@ -25,7 +25,6 @@ use common_fuse_meta::meta::Versioned; use common_meta_app::schema::TableInfo; use opendal::Operator; -use crate::io::write_meta; use crate::io::BlockCompactor; use crate::io::MetaReaders; use crate::io::SegmentWriter; @@ -183,16 +182,14 @@ impl TableMutator for CompactMutator { &new_snapshot.snapshot_id, new_snapshot.format_version(), )?; - write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?; FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, table_info, snapshot_loc, - &new_snapshot.summary, + new_snapshot, ) - .await?; - Ok(()) + .await } } diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index 2b9340864b11b..f96540694ad55 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -220,14 +220,8 @@ impl TableMutator for ReclusterMutator { let (new_snapshot, loc) = base_mutator.into_new_snapshot(segments, summary).await?; - FuseTable::commit_to_meta_server( - ctx.as_ref(), - catalog_name, - table_info, - loc, - &new_snapshot.summary, - ) - .await?; + FuseTable::commit_to_meta_server(ctx.as_ref(), catalog_name, table_info, loc, new_snapshot) + .await?; Ok(()) } } From bac5329346b3e8ac9cbd3f4848925248d88c8709 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 20:12:11 +0800 Subject: [PATCH 2/6] refactor: generate snapshot location in commit phase --- .../operations/mutation/deletion_mutator.rs | 2 +- src/query/storages/fuse/src/fuse_table.rs | 63 +------------------ src/query/storages/fuse/src/io/locations.rs | 9 ++- .../storages/fuse/src/operations/commit.rs | 14 ++--- .../storages/fuse/src/operations/delete.rs | 12 +++- .../src/operations/mutation/base_mutator.rs | 9 +-- .../operations/mutation/compact_mutator.rs | 8 +-- .../operations/mutation/deletion_mutator.rs | 2 +- .../operations/mutation/recluster_mutator.rs | 14 +++-- 9 files changed, 40 insertions(+), 93 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs index a3895f9c26bc7..a50571a733c6f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs @@ -110,7 +110,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> { } } - let (new_snapshot, _) = mutator.into_new_snapshot().await?; + let new_snapshot = mutator.into_new_snapshot().await?; // half segments left after deletion assert_eq!(new_snapshot.segments.len(), 50); diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 039b637afec56..2bac2120c4800 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -168,53 +168,6 @@ impl FuseTable { } } - // pub async fn update_table_meta( - // &self, - // ctx: &dyn TableContext, - // catalog_name: &str, - // snapshot: &TableSnapshot, - // meta: &mut TableMeta, - // ) -> Result<()> { - // let uuid = snapshot.snapshot_id; - // let snapshot_loc = self - // .meta_location_generator() - // .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - // let operator = ctx.get_storage_operator()?; - // write_meta(&operator, &snapshot_loc, snapshot).await?; - // - // // set new snapshot location - // meta.options - // .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone()); - // // remove legacy options - // meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); - // - // let table_id = self.table_info.ident.table_id; - // let table_version = self.table_info.ident.seq; - // let req = UpdateTableMetaReq { - // table_id, - // seq: MatchSeq::Exact(table_version), - // new_table_meta: meta.clone(), - // }; - // - // let catalog = ctx.get_catalog(catalog_name)?; - // let result = catalog.update_table_meta(req).await; - // match result { - // Ok(_) => { - // if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { - // let cache = &mut snapshot_cache.write().await; - // cache.put(snapshot_loc, Arc::new(snapshot.clone())); - // } - // Ok(()) - // } - // Err(e) => { - // // commit snapshot to meta server failed, try to delete it. - // // "major GC" will collect this, if deletion failure (even after DAL retried) - // let _ = operator.object(&snapshot_loc).delete().await; - // Err(e) - // } - // } - // } - pub fn transient(&self) -> bool { self.table_info.meta.options.contains_key("TRANSIENT") } @@ -292,17 +245,11 @@ impl Table for FuseTable { cluster_key_meta, ); - // write down the new snapshot - let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, &self.table_info, - snapshot_loc, + &self.meta_location_generator, new_snapshot, ) .await?; @@ -345,17 +292,11 @@ impl Table for FuseTable { None, ); - // write down the new snapshot - let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, &self.table_info, - snapshot_loc, + &self.meta_location_generator, new_snapshot, ) .await diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 4b03349de9203..d5980f958d55b 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -75,7 +75,7 @@ impl TableMetaLocationGenerator { ) } - pub fn gen_segment_info_location(&self) -> String where { + pub fn gen_segment_info_location(&self) -> String { let segment_uuid = Uuid::new_v4().simple().to_string(); format!( "{}/{}/{}_v{}.json", @@ -98,6 +98,13 @@ impl TableMetaLocationGenerator { SNAPSHOT_V0.version() } } + + pub fn gen_last_snapshot_hint_location(&self) -> String { + format!( + "{}/{}/last_snapshot_location_hint", + &self.prefix, FUSE_TBL_SNAPSHOT_PREFIX, + ) + } } trait SnapshotLocationCreator { diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 12a78a7796a2d..22eb3a84a2c6e 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -42,6 +42,7 @@ use tracing::warn; use uuid::Uuid; use crate::io::write_meta; +use crate::io::TableMetaLocationGenerator; use crate::operations::AppendOperationLogEntry; use crate::operations::TableOperationLog; use crate::statistics; @@ -211,17 +212,11 @@ impl FuseTable { index_data_bytes: new_snapshot.summary.index_size, }; - // write down the new snapshot - let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, &self.table_info, - snapshot_loc, + &self.meta_location_generator, new_snapshot, ) .await @@ -267,9 +262,12 @@ impl FuseTable { ctx: &dyn TableContext, catalog_name: &str, table_info: &TableInfo, - snapshot_location: String, + location_generator: &TableMetaLocationGenerator, snapshot: TableSnapshot, ) -> Result<()> { + let snapshot_location = location_generator + .snapshot_location_from_uuid(&snapshot.snapshot_id, snapshot.format_version())?; + // 1. write down snapshot let operator = ctx.get_storage_operator()?; write_meta(&operator, &snapshot_location, &snapshot).await?; diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 84697e33930a0..d1d1dfce85977 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -125,9 +125,15 @@ impl FuseTable { del_holder: DeletionMutator, catalog_name: &str, ) -> Result<()> { - let (new_snapshot, loc) = del_holder.into_new_snapshot().await?; - Self::commit_to_meta_server(ctx, catalog_name, self.get_table_info(), loc, new_snapshot) - .await?; + let new_snapshot = del_holder.into_new_snapshot().await?; + Self::commit_to_meta_server( + ctx, + catalog_name, + self.get_table_info(), + &self.meta_location_generator, + new_snapshot, + ) + .await?; // TODO check if error is recoverable, and try to resolve the conflict Ok(()) } diff --git a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs index c0a9dca9c7df1..fd3df6dca1f15 100644 --- a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs @@ -85,17 +85,12 @@ impl BaseMutator { self, segments: Vec, summary: Statistics, - ) -> Result<(TableSnapshot, String)> { + ) -> Result { let snapshot = self.base_snapshot; let mut new_snapshot = TableSnapshot::from_previous(&snapshot); new_snapshot.segments = segments; new_snapshot.summary = summary; - // write down the new snapshot - let snapshot_loc = self.location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - Ok((new_snapshot, snapshot_loc)) + Ok(new_snapshot) } pub async fn generate_segments(&self) -> Result<(Vec, Statistics)> { diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs index 315b76f18a38f..1a721a30235d7 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs @@ -177,17 +177,11 @@ impl TableMutator for CompactMutator { new_snapshot.segments.append(&mut merged_segments); new_snapshot.summary = merge_statistics(&self.summary, &merged_summary)?; - // write down the new snapshot - let snapshot_loc = self.location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, table_info, - snapshot_loc, + &self.location_generator, new_snapshot, ) .await diff --git a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs index 99591c9a3d2bc..d4ed5ad5ef5fc 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs @@ -50,7 +50,7 @@ impl DeletionMutator { }) } - pub async fn into_new_snapshot(self) -> Result<(TableSnapshot, String)> { + pub async fn into_new_snapshot(self) -> Result { let (segments, summary) = self.base_mutator.generate_segments().await?; self.base_mutator.into_new_snapshot(segments, summary).await } diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index f96540694ad55..b71d2c7be97cd 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -218,10 +218,16 @@ impl TableMutator for ReclusterMutator { segments.append(&mut merged_segments); summary = merge_statistics(&summary, &merged_summary)?; - let (new_snapshot, loc) = base_mutator.into_new_snapshot(segments, summary).await?; - - FuseTable::commit_to_meta_server(ctx.as_ref(), catalog_name, table_info, loc, new_snapshot) - .await?; + let new_snapshot = base_mutator.into_new_snapshot(segments, summary).await?; + + FuseTable::commit_to_meta_server( + ctx.as_ref(), + catalog_name, + table_info, + &self.base_mutator.location_generator, + new_snapshot, + ) + .await?; Ok(()) } } From 7041eb6c275a70bfee26a96495e2313f3d13f821 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 21:21:26 +0800 Subject: [PATCH 3/6] fix: should commit new table meta to meta store --- src/query/storages/fuse/src/fuse_table.rs | 19 +++++++++++-------- .../storages/fuse/src/operations/commit.rs | 3 +-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 2bac2120c4800..c88728baef9fc 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -234,9 +234,8 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; - let new_snapshot_id = Uuid::new_v4(); let new_snapshot = TableSnapshot::new( - new_snapshot_id, + Uuid::new_v4(), &prev_timestamp, prev_snapshot_id, schema, @@ -245,15 +244,17 @@ impl Table for FuseTable { cluster_key_meta, ); + let mut table_info = self.table_info.clone(); + table_info.meta = new_table_meta; + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &self.table_info, + &table_info, &self.meta_location_generator, new_snapshot, ) - .await?; - Ok(()) + .await } async fn drop_table_cluster_keys( @@ -281,9 +282,8 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; - let new_snapshot_id = Uuid::new_v4(); let new_snapshot = TableSnapshot::new( - new_snapshot_id, + Uuid::new_v4(), &prev_timestamp, prev_snapshot_id, schema, @@ -292,10 +292,13 @@ impl Table for FuseTable { None, ); + let mut table_info = self.table_info.clone(); + table_info.meta = new_table_meta; + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &self.table_info, + &table_info, &self.meta_location_generator, new_snapshot, ) diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 22eb3a84a2c6e..ab444e5f7fab6 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -181,10 +181,9 @@ impl FuseTable { .map(|loc| (loc, SegmentInfo::VERSION)) .collect(); - let new_snapshot_id = Uuid::new_v4(); let new_snapshot = if overwrite { TableSnapshot::new( - new_snapshot_id, + Uuid::new_v4(), &prev_timestamp, prev.as_ref().map(|v| (v.snapshot_id, prev_version)), schema, From 9cce669a3ef90bd0efd952cafa1961665bf34a57 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 21:42:03 +0800 Subject: [PATCH 4/6] write down last snapshot hint --- .../storages/fuse/src/io/write/meta_writer.rs | 2 +- .../storages/fuse/src/operations/commit.rs | 29 ++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 5bcf9d3679c4e..f475418c1aaf3 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -22,7 +22,7 @@ use tracing::warn; use crate::io::retry; use crate::io::retry::Retryable; -pub async fn write_meta(data_accessor: &Operator, location: &str, meta: &T) -> Result<()> +pub async fn write_meta(data_accessor: &Operator, location: &str, meta: T) -> Result<()> where T: Serialize { let op = || async { let bs = serde_json::to_vec(&meta).map_err(Error::other)?; diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index ab444e5f7fab6..a11e09310da7f 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -36,6 +36,7 @@ use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableStatistics; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; +use opendal::Operator; use tracing::debug; use tracing::info; use tracing::warn; @@ -309,8 +310,11 @@ impl FuseTable { Ok(_) => { if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { let cache = &mut snapshot_cache.write().await; - cache.put(snapshot_location, Arc::new(snapshot)); + cache.put(snapshot_location.clone(), Arc::new(snapshot)); } + // try keep a hit file of last snapshot + Self::write_last_snapshot_hint(&operator, location_generator, snapshot_location) + .await; Ok(()) } Err(e) => { @@ -366,9 +370,26 @@ impl FuseTable { } // Left a hint file which indicates the location of the latest snapshot - // async fn write_last_snapshot_hint(_ctx: &dyn TableContext, _last_snapshot_path: String) { - // todo!() - //} + async fn write_last_snapshot_hint( + operator: &Operator, + location_generator: &TableMetaLocationGenerator, + last_snapshot_path: String, + ) { + let hint_path = location_generator.gen_last_snapshot_hint_location(); + // Just try our best to write down the hint file of last snapshot + // - will retry in the case of temporary failure + // but + // - errors are ignored if writing is eventually failed + // - errors (if any) will not be propagated to caller + // - "data race" ignored + // if multiple different versions of hints are written concurrently + // it is NOT guaranteed that the latest version will be kept + write_meta(operator, &hint_path, last_snapshot_path) + .await + .unwrap_or_else(|e| { + tracing::warn!("write last snapshot hint failure. {}", e); + }) + } } mod utils { From 224023ac97e0fa3b75ccf8dc834b65cde20f66c4 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 22:43:53 +0800 Subject: [PATCH 5/6] add unit test --- .../it/storages/fuse/operations/commit.rs | 40 +++++++++++++++++++ src/query/storages/fuse/src/constants.rs | 1 + src/query/storages/fuse/src/io/locations.rs | 5 ++- .../storages/fuse/src/operations/commit.rs | 37 ++++++++++++++--- 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index a2a2a94209c39..10459433614f0 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. use common_base::base::tokio; +use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::Result; +use common_storages_fuse::FuseTable; use futures::TryStreamExt; use crate::storages::fuse::table_test_fixture::execute_query; @@ -76,3 +78,41 @@ async fn test_fuse_occ_retry() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_last_snapshot_hint() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + fixture.create_default_table().await?; + + let table = fixture.latest_default_table().await?; + + let num_blocks = 1; + let rows_per_block = 1; + let value_start_from = 1; + let stream = + TestFixture::gen_sample_blocks_stream_ex(num_blocks, rows_per_block, value_start_from); + + let blocks = stream.try_collect().await?; + fixture + .append_commit_blocks(table.clone(), blocks, false, true) + .await?; + + // check last snapshot hit file + let table = fixture.latest_default_table().await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let last_snapshot_location = fuse_table.snapshot_loc().unwrap(); + let operator = ctx.get_storage_operator()?; + let location = fuse_table + .meta_location_generator() + .gen_last_snapshot_hint_location(); + let storage_meta_data = operator.metadata(); + let storage_prefix = storage_meta_data.root(); + + let expected = format!("{}{}", storage_prefix, last_snapshot_location); + let content = operator.object(location.as_str()).read().await?; + + assert_eq!(content.as_slice(), expected.as_bytes()); + + Ok(()) +} diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 63f4a8463f60e..db881ba2ca99a 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -21,6 +21,7 @@ pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b"; pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i"; pub const FUSE_TBL_SEGMENT_PREFIX: &str = "_sg"; pub const FUSE_TBL_SNAPSHOT_PREFIX: &str = "_ss"; +pub const FUSE_TBL_LAST_SNAPSHOT_HINT: &str = "last_snapshot_location_hint"; pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000; pub const DEFAULT_BLOCK_SIZE_IN_MEM_SIZE_THRESHOLD: usize = 100 * 1024 * 1024; diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index d5980f958d55b..900b3af43c54f 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -27,6 +27,7 @@ use crate::constants::FUSE_TBL_BLOCK_PREFIX; use crate::constants::FUSE_TBL_SEGMENT_PREFIX; use crate::constants::FUSE_TBL_SNAPSHOT_PREFIX; use crate::FUSE_TBL_BLOCK_INDEX_PREFIX; +use crate::FUSE_TBL_LAST_SNAPSHOT_HINT; static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData); static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData); @@ -101,8 +102,8 @@ impl TableMetaLocationGenerator { pub fn gen_last_snapshot_hint_location(&self) -> String { format!( - "{}/{}/last_snapshot_location_hint", - &self.prefix, FUSE_TBL_SNAPSHOT_PREFIX, + "{}/{}/{}", + &self.prefix, FUSE_TBL_SNAPSHOT_PREFIX, FUSE_TBL_LAST_SNAPSHOT_HINT, ) } } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index a11e09310da7f..9a23c12336e26 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -36,6 +36,8 @@ use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableStatistics; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; +use common_storages_util::retry; +use common_storages_util::retry::Retryable; use opendal::Operator; use tracing::debug; use tracing::info; @@ -375,7 +377,6 @@ impl FuseTable { location_generator: &TableMetaLocationGenerator, last_snapshot_path: String, ) { - let hint_path = location_generator.gen_last_snapshot_hint_location(); // Just try our best to write down the hint file of last snapshot // - will retry in the case of temporary failure // but @@ -384,11 +385,35 @@ impl FuseTable { // - "data race" ignored // if multiple different versions of hints are written concurrently // it is NOT guaranteed that the latest version will be kept - write_meta(operator, &hint_path, last_snapshot_path) - .await - .unwrap_or_else(|e| { - tracing::warn!("write last snapshot hint failure. {}", e); - }) + + let hint_path = location_generator.gen_last_snapshot_hint_location(); + let op = || { + let hint_path = hint_path.clone(); + let last_snapshot_path = { + let operator_meta_data = operator.metadata(); + let storage_prefix = operator_meta_data.root(); + format!("{}{}", storage_prefix, last_snapshot_path) + }; + + async move { + operator + .object(&hint_path) + .write(last_snapshot_path) + .await + .map_err(retry::from_io_error) + } + }; + + let notify = |e: std::io::Error, duration| { + warn!( + "transient error encountered while writing last snapshot hint file, location {}, at duration {:?} : {}", + hint_path, duration, e, + ) + }; + + op.retry_with_notify(notify).await.unwrap_or_else(|e| { + tracing::warn!("write last snapshot hint failure. {}", e); + }) } } From beabab9c855dcd5b88db996075a271f6a7b59443 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 31 Aug 2022 23:36:45 +0800 Subject: [PATCH 6/6] adjust last snapshot hint path --- src/query/storages/fuse/src/io/locations.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 900b3af43c54f..41c036c815dde 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -101,10 +101,7 @@ impl TableMetaLocationGenerator { } pub fn gen_last_snapshot_hint_location(&self) -> String { - format!( - "{}/{}/{}", - &self.prefix, FUSE_TBL_SNAPSHOT_PREFIX, FUSE_TBL_LAST_SNAPSHOT_HINT, - ) + format!("{}/{}", &self.prefix, FUSE_TBL_LAST_SNAPSHOT_HINT) } }