diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index a2a2a94209c39..10459433614f0 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. use common_base::base::tokio; +use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::Result; +use common_storages_fuse::FuseTable; use futures::TryStreamExt; use crate::storages::fuse::table_test_fixture::execute_query; @@ -76,3 +78,41 @@ async fn test_fuse_occ_retry() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_last_snapshot_hint() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + fixture.create_default_table().await?; + + let table = fixture.latest_default_table().await?; + + let num_blocks = 1; + let rows_per_block = 1; + let value_start_from = 1; + let stream = + TestFixture::gen_sample_blocks_stream_ex(num_blocks, rows_per_block, value_start_from); + + let blocks = stream.try_collect().await?; + fixture + .append_commit_blocks(table.clone(), blocks, false, true) + .await?; + + // check last snapshot hit file + let table = fixture.latest_default_table().await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let last_snapshot_location = fuse_table.snapshot_loc().unwrap(); + let operator = ctx.get_storage_operator()?; + let location = fuse_table + .meta_location_generator() + .gen_last_snapshot_hint_location(); + let storage_meta_data = operator.metadata(); + let storage_prefix = storage_meta_data.root(); + + let expected = format!("{}{}", storage_prefix, last_snapshot_location); + let content = operator.object(location.as_str()).read().await?; + + assert_eq!(content.as_slice(), expected.as_bytes()); + + Ok(()) +} diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs index a3895f9c26bc7..a50571a733c6f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs @@ -110,7 +110,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> { } } - let (new_snapshot, _) = mutator.into_new_snapshot().await?; + let new_snapshot = mutator.into_new_snapshot().await?; // half segments left after deletion assert_eq!(new_snapshot.segments.len(), 50); diff --git a/src/query/storages/fuse-meta/src/meta/versions.rs b/src/query/storages/fuse-meta/src/meta/versions.rs index e8413ce3bae2b..6c321b4ce7d63 100644 --- a/src/query/storages/fuse-meta/src/meta/versions.rs +++ b/src/query/storages/fuse-meta/src/meta/versions.rs @@ -30,7 +30,7 @@ use crate::meta::Versioned; // impl Versioned<0> for v0::SegmentInfo {} // impl Versioned<1> for v0::SegmentInfo {} // -// Fortunately, since v0::SegmentInfo::VESION is used in +// Fortunately, since v0::SegmentInfo::VERSION is used in // several places, compiler will report compile error if it // can not deduce a unique value the constant expression. diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 63f4a8463f60e..db881ba2ca99a 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -21,6 +21,7 @@ pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b"; pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i"; pub const FUSE_TBL_SEGMENT_PREFIX: &str = "_sg"; pub const FUSE_TBL_SNAPSHOT_PREFIX: &str = "_ss"; +pub const FUSE_TBL_LAST_SNAPSHOT_HINT: &str = "last_snapshot_location_hint"; pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000; pub const DEFAULT_BLOCK_SIZE_IN_MEM_SIZE_THRESHOLD: usize = 100 * 1024 * 1024; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 4182a2434f04f..c88728baef9fc 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -16,23 +16,18 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use common_cache::Cache; use common_catalog::catalog::StorageDescription; use common_catalog::table_context::TableContext; use common_catalog::table_mutator::TableMutator; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; -use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::ClusterKey; use common_fuse_meta::meta::Statistics as FuseStatistics; use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; use common_legacy_parser::ExpressionParser; use common_meta_app::schema::TableInfo; -use common_meta_app::schema::TableMeta; -use common_meta_app::schema::UpdateTableMetaReq; -use common_meta_types::MatchSeq; use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; @@ -43,7 +38,6 @@ use common_planners::TruncateTablePlan; use common_storages_util::storage_context::StorageContext; use uuid::Uuid; -use crate::io::write_meta; use crate::io::BlockCompactor; use crate::io::MetaReaders; use crate::io::TableMetaLocationGenerator; @@ -174,53 +168,6 @@ impl FuseTable { } } - pub async fn update_table_meta( - &self, - ctx: &dyn TableContext, - catalog_name: &str, - snapshot: &TableSnapshot, - meta: &mut TableMeta, - ) -> Result<()> { - let uuid = snapshot.snapshot_id; - let snapshot_loc = self - .meta_location_generator() - .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; - let operator = ctx.get_storage_operator()?; - write_meta(&operator, &snapshot_loc, snapshot).await?; - - // set new snapshot location - meta.options - .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone()); - // remove legacy options - meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); - - let table_id = self.table_info.ident.table_id; - let table_version = self.table_info.ident.seq; - let req = UpdateTableMetaReq { - table_id, - seq: MatchSeq::Exact(table_version), - new_table_meta: meta.clone(), - }; - - let catalog = ctx.get_catalog(catalog_name)?; - let result = catalog.update_table_meta(req).await; - match result { - Ok(_) => { - if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { - let cache = &mut snapshot_cache.write().await; - cache.put(snapshot_loc, Arc::new(snapshot.clone())); - } - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed, try to delete it. - // "major GC" will collect this, if deletion failure (even after DAL retried) - let _ = operator.object(&snapshot_loc).delete().await; - Err(e) - } - } - } - pub fn transient(&self) -> bool { self.table_info.meta.options.contains_key("TRANSIENT") } @@ -297,11 +244,15 @@ impl Table for FuseTable { cluster_key_meta, ); - self.update_table_meta( + let mut table_info = self.table_info.clone(); + table_info.meta = new_table_meta; + + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &table_info, + &self.meta_location_generator, + new_snapshot, ) .await } @@ -341,11 +292,15 @@ impl Table for FuseTable { None, ); - self.update_table_meta( + let mut table_info = self.table_info.clone(); + table_info.meta = new_table_meta; + + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &table_info, + &self.meta_location_generator, + new_snapshot, ) .await } diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 4b03349de9203..41c036c815dde 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -27,6 +27,7 @@ use crate::constants::FUSE_TBL_BLOCK_PREFIX; use crate::constants::FUSE_TBL_SEGMENT_PREFIX; use crate::constants::FUSE_TBL_SNAPSHOT_PREFIX; use crate::FUSE_TBL_BLOCK_INDEX_PREFIX; +use crate::FUSE_TBL_LAST_SNAPSHOT_HINT; static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData); static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData); @@ -75,7 +76,7 @@ impl TableMetaLocationGenerator { ) } - pub fn gen_segment_info_location(&self) -> String where { + pub fn gen_segment_info_location(&self) -> String { let segment_uuid = Uuid::new_v4().simple().to_string(); format!( "{}/{}/{}_v{}.json", @@ -98,6 +99,10 @@ impl TableMetaLocationGenerator { SNAPSHOT_V0.version() } } + + pub fn gen_last_snapshot_hint_location(&self) -> String { + format!("{}/{}", &self.prefix, FUSE_TBL_LAST_SNAPSHOT_HINT) + } } trait SnapshotLocationCreator { diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 5bcf9d3679c4e..f475418c1aaf3 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -22,7 +22,7 @@ use tracing::warn; use crate::io::retry; use crate::io::retry::Retryable; -pub async fn write_meta(data_accessor: &Operator, location: &str, meta: &T) -> Result<()> +pub async fn write_meta(data_accessor: &Operator, location: &str, meta: T) -> Result<()> where T: Serialize { let op = || async { let bs = serde_json::to_vec(&meta).map_err(Error::other)?; diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 2ba7c1e1e2fa1..9a23c12336e26 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -19,11 +19,13 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; use common_base::base::ProgressValues; +use common_cache::Cache; use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; +use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::ClusterKey; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SegmentInfo; @@ -32,14 +34,18 @@ use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableStatistics; -use common_meta_app::schema::UpdateTableMetaReply; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; +use common_storages_util::retry; +use common_storages_util::retry::Retryable; +use opendal::Operator; use tracing::debug; use tracing::info; use tracing::warn; use uuid::Uuid; +use crate::io::write_meta; +use crate::io::TableMetaLocationGenerator; use crate::operations::AppendOperationLogEntry; use crate::operations::TableOperationLog; use crate::statistics; @@ -177,6 +183,7 @@ impl FuseTable { .into_iter() .map(|loc| (loc, SegmentInfo::VERSION)) .collect(); + let new_snapshot = if overwrite { TableSnapshot::new( Uuid::new_v4(), @@ -207,11 +214,12 @@ impl FuseTable { index_data_bytes: new_snapshot.summary.index_size, }; - self.update_table_meta( + FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, - &new_snapshot, - &mut new_table_meta, + &self.table_info, + &self.meta_location_generator, + new_snapshot, ) .await } @@ -256,24 +264,28 @@ impl FuseTable { ctx: &dyn TableContext, catalog_name: &str, table_info: &TableInfo, - new_snapshot_location: String, - stats: &Statistics, - ) -> Result { - let catalog = ctx.get_catalog(catalog_name)?; + location_generator: &TableMetaLocationGenerator, + snapshot: TableSnapshot, + ) -> Result<()> { + let snapshot_location = location_generator + .snapshot_location_from_uuid(&snapshot.snapshot_id, snapshot.format_version())?; - let table_id = table_info.ident.table_id; - let table_version = table_info.ident.seq; + // 1. write down snapshot + let operator = ctx.get_storage_operator()?; + write_meta(&operator, &snapshot_location, &snapshot).await?; + // 2. prepare table meta let mut new_table_meta = table_info.meta.clone(); - - // set new snapshot location - new_table_meta - .options - .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), new_snapshot_location); - + // 2.1 set new snapshot location + new_table_meta.options.insert( + OPT_KEY_SNAPSHOT_LOCATION.to_owned(), + snapshot_location.clone(), + ); // remove legacy options self::utils::remove_legacy_options(&mut new_table_meta.options); + // 2.2 setup table statistics + let stats = &snapshot.summary; // update statistics new_table_meta.statistics = TableStatistics { number_of_rows: stats.row_count, @@ -282,13 +294,38 @@ impl FuseTable { index_data_bytes: stats.index_size, }; + // 3. prepare the request + + let catalog = ctx.get_catalog(catalog_name)?; + let table_id = table_info.ident.table_id; + let table_version = table_info.ident.seq; + let req = UpdateTableMetaReq { table_id, seq: MatchSeq::Exact(table_version), new_table_meta, }; - catalog.update_table_meta(req).await + // 3. let's roll + let reply = catalog.update_table_meta(req).await; + match reply { + Ok(_) => { + if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() { + let cache = &mut snapshot_cache.write().await; + cache.put(snapshot_location.clone(), Arc::new(snapshot)); + } + // try keep a hit file of last snapshot + Self::write_last_snapshot_hint(&operator, location_generator, snapshot_location) + .await; + Ok(()) + } + Err(e) => { + // commit snapshot to meta server failed, try to delete it. + // "major GC" will collect this, if deletion failure (even after DAL retried) + let _ = operator.object(&snapshot_location).delete().await; + Err(e) + } + } } pub fn merge_append_operations( @@ -333,6 +370,51 @@ impl FuseTable { }; catalog.get_table_by_info(&table_info) } + + // Left a hint file which indicates the location of the latest snapshot + async fn write_last_snapshot_hint( + operator: &Operator, + location_generator: &TableMetaLocationGenerator, + last_snapshot_path: String, + ) { + // Just try our best to write down the hint file of last snapshot + // - will retry in the case of temporary failure + // but + // - errors are ignored if writing is eventually failed + // - errors (if any) will not be propagated to caller + // - "data race" ignored + // if multiple different versions of hints are written concurrently + // it is NOT guaranteed that the latest version will be kept + + let hint_path = location_generator.gen_last_snapshot_hint_location(); + let op = || { + let hint_path = hint_path.clone(); + let last_snapshot_path = { + let operator_meta_data = operator.metadata(); + let storage_prefix = operator_meta_data.root(); + format!("{}{}", storage_prefix, last_snapshot_path) + }; + + async move { + operator + .object(&hint_path) + .write(last_snapshot_path) + .await + .map_err(retry::from_io_error) + } + }; + + let notify = |e: std::io::Error, duration| { + warn!( + "transient error encountered while writing last snapshot hint file, location {}, at duration {:?} : {}", + hint_path, duration, e, + ) + }; + + op.retry_with_notify(notify).await.unwrap_or_else(|e| { + tracing::warn!("write last snapshot hint failure. {}", e); + }) + } } mod utils { diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 7b917ba4d20cc..d1d1dfce85977 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -125,13 +125,13 @@ impl FuseTable { del_holder: DeletionMutator, catalog_name: &str, ) -> Result<()> { - let (new_snapshot, loc) = del_holder.into_new_snapshot().await?; + let new_snapshot = del_holder.into_new_snapshot().await?; Self::commit_to_meta_server( ctx, catalog_name, self.get_table_info(), - loc, - &new_snapshot.summary, + &self.meta_location_generator, + new_snapshot, ) .await?; // TODO check if error is recoverable, and try to resolve the conflict diff --git a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs index 7169d0badeb26..fd3df6dca1f15 100644 --- a/src/query/storages/fuse/src/operations/mutation/base_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/base_mutator.rs @@ -26,7 +26,6 @@ use common_fuse_meta::meta::Statistics; use common_fuse_meta::meta::TableSnapshot; use opendal::Operator; -use crate::io::write_meta; use crate::io::MetaReaders; use crate::io::SegmentWriter; use crate::io::TableMetaLocationGenerator; @@ -86,18 +85,12 @@ impl BaseMutator { self, segments: Vec, summary: Statistics, - ) -> Result<(TableSnapshot, String)> { + ) -> Result { let snapshot = self.base_snapshot; let mut new_snapshot = TableSnapshot::from_previous(&snapshot); new_snapshot.segments = segments; new_snapshot.summary = summary; - // write down the new snapshot - let snapshot_loc = self.location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?; - Ok((new_snapshot, snapshot_loc)) + Ok(new_snapshot) } pub async fn generate_segments(&self) -> Result<(Vec, Statistics)> { diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs index c506b5fd1a6ab..1a721a30235d7 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator.rs @@ -25,7 +25,6 @@ use common_fuse_meta::meta::Versioned; use common_meta_app::schema::TableInfo; use opendal::Operator; -use crate::io::write_meta; use crate::io::BlockCompactor; use crate::io::MetaReaders; use crate::io::SegmentWriter; @@ -178,21 +177,13 @@ impl TableMutator for CompactMutator { new_snapshot.segments.append(&mut merged_segments); new_snapshot.summary = merge_statistics(&self.summary, &merged_summary)?; - // write down the new snapshot - let snapshot_loc = self.location_generator.snapshot_location_from_uuid( - &new_snapshot.snapshot_id, - new_snapshot.format_version(), - )?; - write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?; - FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, table_info, - snapshot_loc, - &new_snapshot.summary, + &self.location_generator, + new_snapshot, ) - .await?; - Ok(()) + .await } } diff --git a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs index 99591c9a3d2bc..d4ed5ad5ef5fc 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/deletion_mutator.rs @@ -50,7 +50,7 @@ impl DeletionMutator { }) } - pub async fn into_new_snapshot(self) -> Result<(TableSnapshot, String)> { + pub async fn into_new_snapshot(self) -> Result { let (segments, summary) = self.base_mutator.generate_segments().await?; self.base_mutator.into_new_snapshot(segments, summary).await } diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index 2b9340864b11b..b71d2c7be97cd 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -218,14 +218,14 @@ impl TableMutator for ReclusterMutator { segments.append(&mut merged_segments); summary = merge_statistics(&summary, &merged_summary)?; - let (new_snapshot, loc) = base_mutator.into_new_snapshot(segments, summary).await?; + let new_snapshot = base_mutator.into_new_snapshot(segments, summary).await?; FuseTable::commit_to_meta_server( ctx.as_ref(), catalog_name, table_info, - loc, - &new_snapshot.summary, + &self.base_mutator.location_generator, + new_snapshot, ) .await?; Ok(())