From 21a1d06747bb604f104555fb045f98112e3c2e2c Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Mon, 10 Oct 2022 16:43:17 +0800 Subject: [PATCH 01/10] Add read_snapshot to spawn worker --- Cargo.lock | 1 + src/query/storages/fuse/Cargo.toml | 1 + src/query/storages/fuse/src/fuse_snapshot.rs | 47 ++++++++++++++++++++ src/query/storages/fuse/src/lib.rs | 1 + src/query/storages/fuse/src/operations/gc.rs | 29 +++++++----- 5 files changed, 68 insertions(+), 11 deletions(-) create mode 100644 src/query/storages/fuse/src/fuse_snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index a33b57731be25..af9fa24946b2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2012,6 +2012,7 @@ dependencies = [ name = "common-storages-fuse" version = "0.1.0" dependencies = [ + "async-channel", "async-trait-fn", "backoff", "chrono", diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index d88bb65d1e8b6..e0983a752aed7 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -34,6 +34,7 @@ common-storages-index = { path = "../index" } common-storages-util = { path = "../util" } common-streams = { path = "../../streams" } +async-channel = "1.7.1" async-trait = { version = "0.1.57", package = "async-trait-fn" } backoff = { version = "0.4.0", features = ["futures", "tokio"] } chrono = "0.4.22" diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs new file mode 100644 index 0000000000000..c5a027528955d --- /dev/null +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -0,0 +1,47 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_fuse_meta::meta::TableSnapshot; +use futures_util::TryStreamExt; + +use crate::io::MetaReaders; +use crate::io::SnapshotHistoryReader; +use crate::io::TableMetaLocationGenerator; + +#[tracing::instrument(level = "debug", skip_all)] +pub async fn read_snapshots( + ctx: Arc, + location: String, + format_version: u64, + location_gen: TableMetaLocationGenerator, +) -> Result>> { + let (tx, rx) = async_channel::bounded(100); + let reader = MetaReaders::table_snapshot_reader(ctx); + let mut snapshot_history = reader.snapshot_history(location, format_version, location_gen); + + common_base::base::tokio::spawn(async move { + while let Ok(Some(s)) = snapshot_history.try_next().await { + if let Err(_cause) = tx.send(s).await { + break; + } + } + }); + + Ok(rx) +} diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index f9f3b7dd9bda6..de4a29b5a0d72 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -20,6 +20,7 @@ mod constants; mod fuse_lazy_part; mod fuse_part; mod fuse_segment; +mod fuse_snapshot; mod fuse_table; pub mod io; pub mod operations; diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 8199eea8364c0..61a96e648aa5a 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -23,13 +23,13 @@ use common_exception::Result; use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SnapshotId; -use futures::TryStreamExt; +use common_fuse_meta::meta::TableSnapshot; +use futures_util::StreamExt; use opendal::Operator; use tracing::warn; use crate::fuse_segment::read_segments; -use crate::io::MetaReaders; -use crate::io::SnapshotHistoryReader; +use crate::fuse_snapshot::read_snapshots; use crate::FuseTable; impl FuseTable { @@ -57,8 +57,6 @@ impl FuseTable { return Ok(()); }; - let reader = MetaReaders::table_snapshot_reader(ctx.clone()); - let (prev_id, prev_ver) = if let Some((id, ver)) = last_snapshot.prev_snapshot_id { (id, ver) } else { @@ -84,9 +82,6 @@ impl FuseTable { .meta_location_generator .snapshot_location_from_uuid(&prev_id, prev_ver)?; - let mut snapshot_history = - reader.snapshot_history(prev_loc, prev_ver, self.meta_location_generator.clone()); - let mut snapshots_to_be_deleted: Vec<_> = Vec::new(); if !keep_last_snapshot { snapshots_to_be_deleted @@ -111,9 +106,21 @@ impl FuseTable { // collects // - all the previous snapshots // - segments referenced by previous snapshots, but not by gc_root - while let Some(s) = snapshot_history.try_next().await? { - snapshots_to_be_deleted.push((s.snapshot_id, s.format_version())); - for seg in &s.segments { + let mut snapshots = read_snapshots( + ctx.clone(), + prev_loc, + prev_ver, + self.meta_location_generator.clone(), + ) + .await?; + + while let ss = snapshots + .recv() + .await + .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))? + { + snapshots_to_be_deleted.push((ss.snapshot_id, ss.format_version())); + for seg in &ss.segments { if !segments_referenced_by_gc_root.contains(seg) { segments_to_be_deleted.insert(seg.clone()); } From b50a706d6da7a300486a8c51230c63bd9bbc31f5 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 14:42:05 +0800 Subject: [PATCH 02/10] Change fuse_snapshot from sync to asyncsource --- Cargo.lock | 2 - .../src/procedures/systems/fuse_snapshot.rs | 24 ++-- src/query/storages/fuse/Cargo.toml | 2 - src/query/storages/fuse/src/fuse_snapshot.rs | 91 ++++++++++++--- src/query/storages/fuse/src/operations/gc.rs | 29 ++--- .../fuse_segments/fuse_segment_table.rs | 2 +- .../fuse_snapshots/fuse_snapshot.rs | 105 ++++-------------- .../fuse_snapshots/fuse_snapshot_table.rs | 90 +++++++++++---- 8 files changed, 184 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af9fa24946b2f..08bffad36886c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2012,7 +2012,6 @@ dependencies = [ name = "common-storages-fuse" version = "0.1.0" dependencies = [ - "async-channel", "async-trait-fn", "backoff", "chrono", @@ -2036,7 +2035,6 @@ dependencies = [ "common-storage", "common-storages-index", "common-storages-util", - "common-streams", "futures", "futures-util", "opendal", diff --git a/src/query/service/src/procedures/systems/fuse_snapshot.rs b/src/query/service/src/procedures/systems/fuse_snapshot.rs index c46ff8c8395e8..d68d5a2bee148 100644 --- a/src/query/service/src/procedures/systems/fuse_snapshot.rs +++ b/src/query/service/src/procedures/systems/fuse_snapshot.rs @@ -14,11 +14,12 @@ use std::sync::Arc; +use common_datablocks::DataBlock; use common_datavalues::DataSchema; use common_exception::Result; -use common_streams::SendableDataBlockStream; +use common_storages_fuse::FuseTable; -use crate::procedures::procedure::StreamProcedure; +use crate::procedures::OneBlockProcedure; use crate::procedures::Procedure; use crate::procedures::ProcedureFeatures; use crate::sessions::QueryContext; @@ -35,7 +36,7 @@ impl FuseSnapshotProcedure { } #[async_trait::async_trait] -impl StreamProcedure for FuseSnapshotProcedure { +impl OneBlockProcedure for FuseSnapshotProcedure { fn name(&self) -> &str { "FUSE_SNAPSHOT" } @@ -44,19 +45,12 @@ impl StreamProcedure for FuseSnapshotProcedure { ProcedureFeatures::default().variadic_arguments(2, 3) } - async fn data_stream( - &self, - ctx: Arc, - args: Vec, - ) -> Result { - let catalog_name = ctx.get_current_catalog(); - let tenant_id = ctx.get_tenant(); + async fn all_data(&self, ctx: Arc, args: Vec) -> Result { let database_name = args[0].clone(); let table_name = args[1].clone(); - - let limit = args.get(2).map(|arg| arg.parse::()).transpose()?; + let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&catalog_name)? + .get_catalog(&ctx.get_current_catalog())? .get_table( tenant_id.as_str(), database_name.as_str(), @@ -64,7 +58,9 @@ impl StreamProcedure for FuseSnapshotProcedure { ) .await?; - FuseSnapshot::new(ctx, tbl).get_history_stream_as_blocks(limit) + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + + Ok(FuseSnapshot::new(ctx, tbl).get_snapshots().await?) } fn schema(&self) -> Arc { diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index e0983a752aed7..9c19d3c58d2df 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -32,9 +32,7 @@ common-pipeline-transforms = { path = "../../pipeline/transforms" } common-storage = { path = "../../../common/storage" } common-storages-index = { path = "../index" } common-storages-util = { path = "../util" } -common-streams = { path = "../../streams" } -async-channel = "1.7.1" async-trait = { version = "0.1.57", package = "async-trait-fn" } backoff = { version = "0.4.0", features = ["futures", "tokio"] } chrono = "0.4.22" diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index c5a027528955d..272891433961a 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -12,36 +12,97 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::Path; use std::sync::Arc; -use async_channel::Receiver; +use common_base::base::tokio::sync::Semaphore; +use common_base::base::Runtime; use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; use common_exception::Result; use common_fuse_meta::meta::TableSnapshot; +use futures_util::future; use futures_util::TryStreamExt; +use opendal::ObjectMode; +use opendal::Operator; +use tracing::warn; +use tracing::Instrument; use crate::io::MetaReaders; -use crate::io::SnapshotHistoryReader; -use crate::io::TableMetaLocationGenerator; -#[tracing::instrument(level = "debug", skip_all)] -pub async fn read_snapshots( +async fn read_snapshot( ctx: Arc, - location: String, + snapshot_location: String, format_version: u64, - location_gen: TableMetaLocationGenerator, -) -> Result>> { - let (tx, rx) = async_channel::bounded(100); +) -> Result> { let reader = MetaReaders::table_snapshot_reader(ctx); - let mut snapshot_history = reader.snapshot_history(location, format_version, location_gen); + reader.read(snapshot_location, None, format_version).await +} - common_base::base::tokio::spawn(async move { - while let Ok(Some(s)) = snapshot_history.try_next().await { - if let Err(_cause) = tx.send(s).await { - break; +#[tracing::instrument(level = "debug", skip_all)] +pub async fn read_snapshots_by_root_file( + ctx: Arc, + root_snapshot_file: String, + format_version: u64, + data_accessor: &Operator, +) -> Result>>> { + let mut snapshot_files = vec![]; + if let Some(path) = Path::new(&root_snapshot_file).parent() { + let snapshot_prefix = path.to_str().unwrap_or(""); + if snapshot_prefix.contains('/') { + let mut ds = data_accessor.object(snapshot_prefix).list().await?; + // ObjectStreamer implements `futures::Stream` + while let Some(de) = ds.try_next().await? { + match de.mode() { + ObjectMode::FILE => { + snapshot_files.push(de.path().to_string()); + } + _ => { + warn!( + "find not snapshot file in {:}, found: {:?}", + snapshot_prefix, de + ); + continue; + } + } } } + } + + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + + // 1.1 combine all the tasks. + let mut iter = snapshot_files.iter(); + let tasks = std::iter::from_fn(move || { + if let Some(location) = iter.next() { + let ctx = ctx.clone(); + let location = location.clone(); + Some( + read_snapshot(ctx, location, format_version) + .instrument(tracing::debug_span!("read_snapshot")), + ) + } else { + None + } }); - Ok(rx) + // 1.2 build the runtime. + let semaphore = Arc::new(Semaphore::new(max_io_requests)); + let segments_runtime = Arc::new(Runtime::with_worker_threads( + max_runtime_threads, + Some("fuse-req-snapshots-worker".to_owned()), + )?); + + // 1.3 spawn all the tasks to the runtime. + let join_handlers = segments_runtime + .try_spawn_batch(semaphore.clone(), tasks) + .await?; + + // 1.4 get all the result. + let joint: Vec>> = future::try_join_all(join_handlers) + .instrument(tracing::debug_span!("read_snapshots_join_all")) + .await + .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))?; + Ok(joint) } diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 61a96e648aa5a..8199eea8364c0 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -23,13 +23,13 @@ use common_exception::Result; use common_fuse_meta::caches::CacheManager; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SnapshotId; -use common_fuse_meta::meta::TableSnapshot; -use futures_util::StreamExt; +use futures::TryStreamExt; use opendal::Operator; use tracing::warn; use crate::fuse_segment::read_segments; -use crate::fuse_snapshot::read_snapshots; +use crate::io::MetaReaders; +use crate::io::SnapshotHistoryReader; use crate::FuseTable; impl FuseTable { @@ -57,6 +57,8 @@ impl FuseTable { return Ok(()); }; + let reader = MetaReaders::table_snapshot_reader(ctx.clone()); + let (prev_id, prev_ver) = if let Some((id, ver)) = last_snapshot.prev_snapshot_id { (id, ver) } else { @@ -82,6 +84,9 @@ impl FuseTable { .meta_location_generator .snapshot_location_from_uuid(&prev_id, prev_ver)?; + let mut snapshot_history = + reader.snapshot_history(prev_loc, prev_ver, self.meta_location_generator.clone()); + let mut snapshots_to_be_deleted: Vec<_> = Vec::new(); if !keep_last_snapshot { snapshots_to_be_deleted @@ -106,21 +111,9 @@ impl FuseTable { // collects // - all the previous snapshots // - segments referenced by previous snapshots, but not by gc_root - let mut snapshots = read_snapshots( - ctx.clone(), - prev_loc, - prev_ver, - self.meta_location_generator.clone(), - ) - .await?; - - while let ss = snapshots - .recv() - .await - .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))? - { - snapshots_to_be_deleted.push((ss.snapshot_id, ss.format_version())); - for seg in &ss.segments { + while let Some(s) = snapshot_history.try_next().await? { + snapshots_to_be_deleted.push((s.snapshot_id, s.format_version())); + for seg in &s.segments { if !segments_referenced_by_gc_root.contains(seg) { segments_to_be_deleted.insert(seg.clone()); } diff --git a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs index f212902f2b849..f5565f394c9c2 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs @@ -160,7 +160,7 @@ impl FuseHistorySource { #[async_trait::async_trait] impl AsyncSource for FuseHistorySource { - const NAME: &'static str = "fuse_snapshot"; + const NAME: &'static str = "fuse_segment"; #[async_trait::unboxed_simple] async fn generate(&mut self) -> Result> { diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index 70886a0ee1af5..06e77614abbdc 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -14,113 +14,46 @@ use std::sync::Arc; -use common_catalog::table::Table; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; use common_fuse_meta::meta::TableSnapshot; -use common_streams::DataBlockStream; -use common_streams::SendableDataBlockStream; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; -use crate::io::MetaReaders; -use crate::io::SnapshotHistoryReader; +use crate::fuse_snapshot::read_snapshots_by_root_file; use crate::io::TableMetaLocationGenerator; use crate::sessions::TableContext; use crate::FuseTable; -pub struct FuseSnapshot { +pub struct FuseSnapshot<'a> { pub ctx: Arc, - pub table: Arc, + pub table: &'a FuseTable, } -impl FuseSnapshot { - pub fn new(ctx: Arc, table: Arc) -> Self { +impl<'a> FuseSnapshot<'a> { + pub fn new(ctx: Arc, table: &'a FuseTable) -> Self { Self { ctx, table } } - /// Get table snapshot history as stream of data block - /// For cases that caller inside sync context, i.e. using async catalog api is not convenient - pub fn new_snapshot_history_stream( - ctx: Arc, - database_name: String, - table_name: String, - catalog_name: String, - limit: Option, - ) -> SendableDataBlockStream { - // prepare the future that resolved the table - let table_instance = { - let ctx = ctx.clone(); - async move { - let tenant_id = ctx.get_tenant(); - let tbl = ctx - .get_catalog(catalog_name.as_str())? - .get_table( - tenant_id.as_str(), - database_name.as_str(), - table_name.as_str(), - ) - .await?; - Ok(tbl) - } - }; - - // chain the future with a stream of snapshot, for the resolved table, as data blocks - let resolved_table = futures::stream::once(table_instance); - let stream = resolved_table.map(move |item: Result<_>| match item { - Ok(table) => { - let fuse_snapshot = FuseSnapshot::new(ctx.clone(), table); - Ok(fuse_snapshot.get_history_stream_as_blocks(limit)?) - } - Err(e) => Err(e), - }); - - // flat it into single stream of data blocks - stream.try_flatten().boxed() - } - - pub fn get_history_stream_as_blocks( - self, - limit: Option, - ) -> Result { - let tbl = FuseTable::try_from_table(self.table.as_ref())?; - let snapshot_location = tbl.snapshot_loc(); - let meta_location_generator = tbl.meta_location_generator.clone(); + pub async fn get_snapshots(self) -> Result { + let meta_location_generator = self.table.meta_location_generator.clone(); + let snapshot_location = self.table.snapshot_loc(); if let Some(snapshot_location) = snapshot_location { - let snapshot_version = tbl.snapshot_format_version(); - let snapshot_reader = MetaReaders::table_snapshot_reader(self.ctx.clone()); - let snapshot_stream = snapshot_reader.snapshot_history( + let snapshot_version = self.table.snapshot_format_version(); + let snapshots_results = read_snapshots_by_root_file( + self.ctx.clone(), snapshot_location, snapshot_version, - tbl.meta_location_generator().clone(), - ); - - // map snapshot to data block - let block_stream = snapshot_stream.map(move |snapshot| match snapshot { - Ok(snapshot) => Self::snapshots_to_block( - &meta_location_generator, - vec![snapshot], - snapshot_version, - ), - Err(e) => Err(e), - }); - - // limit if necessary - if let Some(limit) = limit { - Ok(block_stream.take(limit).boxed()) - } else { - Ok(block_stream.boxed()) - } - } else { - // to carries the schema info back to caller, instead of an empty stream, - // a stream of single empty block item returned - let data_block = DataBlock::empty_with_schema(FuseSnapshot::schema()); - Ok(DataBlockStream::create(FuseSnapshot::schema(), None, vec![data_block]).boxed()) + &self.table.operator, + ) + .await?; + let snapshots = snapshots_results.into_iter().flatten().collect(); + return self.to_block(&meta_location_generator, snapshots, snapshot_version); } + Ok(DataBlock::empty_with_schema(FuseSnapshot::schema())) } - fn snapshots_to_block( + fn to_block( + &self, location_generator: &TableMetaLocationGenerator, snapshots: Vec>, latest_snapshot_version: u64, diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs index e4655f6a284c3..d44b0cfddd481 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::sync::Arc; use common_catalog::catalog::CATALOG_DEFAULT; +use common_datablocks::DataBlock; use common_exception::Result; use common_legacy_expression::LegacyExpression; use common_legacy_planners::Extras; @@ -25,7 +26,9 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; -use common_pipeline_sources::processors::sources::StreamSourceNoSkipEmpty; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_sources::processors::sources::AsyncSource; +use common_pipeline_sources::processors::sources::AsyncSourcer; use super::fuse_snapshot::FuseSnapshot; use super::table_args::parse_func_history_args; @@ -36,6 +39,7 @@ use crate::sessions::TableContext; use crate::table_functions::string_literal; use crate::table_functions::TableArgs; use crate::table_functions::TableFunction; +use crate::FuseTable; use crate::Table; const FUSE_FUNC_SNAPSHOT: &str = "fuse_snapshot"; @@ -74,10 +78,6 @@ impl FuseSnapshotTable { arg_table_name, })) } - - fn get_limit(plan: &ReadDataSourcePlan) -> Option { - plan.push_downs.as_ref().and_then(|extras| extras.limit) - } } #[async_trait::async_trait] @@ -108,29 +108,19 @@ impl Table for FuseSnapshotTable { fn read_data( &self, ctx: Arc, - plan: &ReadDataSourcePlan, + _: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { let output = OutputPort::create(); - let limit = Self::get_limit(plan); - let database_name = self.arg_database_name.to_owned(); - let table_name = self.arg_table_name.to_owned(); - let catalog_name = CATALOG_DEFAULT.to_owned(); - let snapshot_stream = FuseSnapshot::new_snapshot_history_stream( - ctx.clone(), - database_name, - table_name, - catalog_name, - limit, - ); - - // the underlying stream may returns a single empty block, which carries the schema - let source = StreamSourceNoSkipEmpty::create(ctx, Some(snapshot_stream), output.clone())?; - pipeline.add_pipe(Pipe::SimplePipe { inputs_port: vec![], - outputs_port: vec![output], - processors: vec![source], + outputs_port: vec![output.clone()], + processors: vec![FuseHistorySource::create( + ctx, + output, + self.arg_database_name.to_owned(), + self.arg_table_name.to_owned(), + )?], }); Ok(()) @@ -147,3 +137,57 @@ impl TableFunction for FuseSnapshotTable { self } } + +struct FuseHistorySource { + finish: bool, + ctx: Arc, + arg_database_name: String, + arg_table_name: String, +} + +impl FuseHistorySource { + pub fn create( + ctx: Arc, + output: Arc, + arg_database_name: String, + arg_table_name: String, + ) -> Result { + AsyncSourcer::create(ctx.clone(), output, FuseHistorySource { + ctx, + finish: false, + arg_table_name, + arg_database_name, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for FuseHistorySource { + const NAME: &'static str = "fuse_snapshot"; + + #[async_trait::unboxed_simple] + async fn generate(&mut self) -> Result> { + if self.finish { + return Ok(None); + } + + self.finish = true; + let tenant_id = self.ctx.get_tenant(); + let tbl = self + .ctx + .get_catalog(CATALOG_DEFAULT)? + .get_table( + tenant_id.as_str(), + self.arg_database_name.as_str(), + self.arg_table_name.as_str(), + ) + .await?; + + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + Ok(Some( + FuseSnapshot::new(self.ctx.clone(), tbl) + .get_snapshots() + .await?, + )) + } +} From 85df7f4c3a994f956d2d22c23ef88c0ef4fc92d0 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 15:03:19 +0800 Subject: [PATCH 03/10] Fix read_snapshots with last '/' and better the source name --- src/query/storages/fuse/src/fuse_snapshot.rs | 39 +++++++++++-------- .../fuse_segments/fuse_segment_table.rs | 10 ++--- .../fuse_snapshots/fuse_snapshot_table.rs | 10 ++--- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index 272891433961a..5fd7d0e3a544f 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -48,22 +48,29 @@ pub async fn read_snapshots_by_root_file( ) -> Result>>> { let mut snapshot_files = vec![]; if let Some(path) = Path::new(&root_snapshot_file).parent() { - let snapshot_prefix = path.to_str().unwrap_or(""); - if snapshot_prefix.contains('/') { - let mut ds = data_accessor.object(snapshot_prefix).list().await?; - // ObjectStreamer implements `futures::Stream` - while let Some(de) = ds.try_next().await? { - match de.mode() { - ObjectMode::FILE => { - snapshot_files.push(de.path().to_string()); - } - _ => { - warn!( - "find not snapshot file in {:}, found: {:?}", - snapshot_prefix, de - ); - continue; - } + let mut snapshot_prefix = path.to_str().unwrap_or("").to_string(); + + // Check if the prefix:db/table/_ss is reasonable. + if !snapshot_prefix.contains('/') { + return Ok(vec![]); + } + + if !snapshot_prefix.ends_with('/') { + snapshot_prefix += "/"; + } + + let mut ds = data_accessor.object(&snapshot_prefix).list().await?; + while let Some(de) = ds.try_next().await? { + match de.mode() { + ObjectMode::FILE => { + snapshot_files.push(de.path().to_string()); + } + _ => { + warn!( + "Found not snapshot file in {:}, found: {:?}", + snapshot_prefix, de + ); + continue; } } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs index f5565f394c9c2..9b3baf7032f2c 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs @@ -119,7 +119,7 @@ impl Table for FuseSegmentTable { pipeline.add_pipe(Pipe::SimplePipe { inputs_port: vec![], outputs_port: vec![output.clone()], - processors: vec![FuseHistorySource::create( + processors: vec![FuseSegmentSource::create( ctx, output, self.arg_database_name.to_owned(), @@ -132,7 +132,7 @@ impl Table for FuseSegmentTable { } } -struct FuseHistorySource { +struct FuseSegmentSource { finish: bool, ctx: Arc, arg_database_name: String, @@ -140,7 +140,7 @@ struct FuseHistorySource { arg_snapshot_id: String, } -impl FuseHistorySource { +impl FuseSegmentSource { pub fn create( ctx: Arc, output: Arc, @@ -148,7 +148,7 @@ impl FuseHistorySource { arg_table_name: String, arg_snapshot_id: String, ) -> Result { - AsyncSourcer::create(ctx.clone(), output, FuseHistorySource { + AsyncSourcer::create(ctx.clone(), output, FuseSegmentSource { ctx, finish: false, arg_table_name, @@ -159,7 +159,7 @@ impl FuseHistorySource { } #[async_trait::async_trait] -impl AsyncSource for FuseHistorySource { +impl AsyncSource for FuseSegmentSource { const NAME: &'static str = "fuse_segment"; #[async_trait::unboxed_simple] diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs index d44b0cfddd481..763378eb54a11 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs @@ -115,7 +115,7 @@ impl Table for FuseSnapshotTable { pipeline.add_pipe(Pipe::SimplePipe { inputs_port: vec![], outputs_port: vec![output.clone()], - processors: vec![FuseHistorySource::create( + processors: vec![FuseSnapshotSource::create( ctx, output, self.arg_database_name.to_owned(), @@ -138,21 +138,21 @@ impl TableFunction for FuseSnapshotTable { } } -struct FuseHistorySource { +struct FuseSnapshotSource { finish: bool, ctx: Arc, arg_database_name: String, arg_table_name: String, } -impl FuseHistorySource { +impl FuseSnapshotSource { pub fn create( ctx: Arc, output: Arc, arg_database_name: String, arg_table_name: String, ) -> Result { - AsyncSourcer::create(ctx.clone(), output, FuseHistorySource { + AsyncSourcer::create(ctx.clone(), output, FuseSnapshotSource { ctx, finish: false, arg_table_name, @@ -162,7 +162,7 @@ impl FuseHistorySource { } #[async_trait::async_trait] -impl AsyncSource for FuseHistorySource { +impl AsyncSource for FuseSnapshotSource { const NAME: &'static str = "fuse_snapshot"; #[async_trait::unboxed_simple] From 40e8448f263329f55303e764d36e17203b11db75 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 15:23:11 +0800 Subject: [PATCH 04/10] Add more comments for the read_snapshots --- src/query/storages/fuse/src/fuse_snapshot.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index 5fd7d0e3a544f..e48850a41f0e6 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -39,6 +39,10 @@ async fn read_snapshot( reader.read(snapshot_location, None, format_version).await } +// Read all the snapshots by the root file: +// 1. Get the prefix:'/db/table/_ss/' from the root_snapshot_file('/db/table/_ss/xx.json') +// 2. List all the files in the prefix +// 3. Try to read all the snapshot files in parallel. #[tracing::instrument(level = "debug", skip_all)] pub async fn read_snapshots_by_root_file( ctx: Arc, @@ -55,10 +59,12 @@ pub async fn read_snapshots_by_root_file( return Ok(vec![]); } + // Append '/' to the end if need. if !snapshot_prefix.ends_with('/') { snapshot_prefix += "/"; } + // List the prefix path to get all the snapshot files list. let mut ds = data_accessor.object(&snapshot_prefix).list().await?; while let Some(de) = ds.try_next().await? { match de.mode() { @@ -76,6 +82,11 @@ pub async fn read_snapshots_by_root_file( } } + // Only return if no snapshot files . + if !snapshot_files.is_empty() { + return Ok(vec![]); + } + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; From d50c87f77457de2d95448b352da30b9d26f42089 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 17:36:50 +0800 Subject: [PATCH 05/10] Add snapshot chain --- src/query/storages/fuse/src/fuse_snapshot.rs | 45 ++++++++++++++++--- .../fuse_snapshots/fuse_snapshot.rs | 3 +- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index e48850a41f0e6..71ac606cef788 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -35,7 +36,7 @@ async fn read_snapshot( snapshot_location: String, format_version: u64, ) -> Result> { - let reader = MetaReaders::table_snapshot_reader(ctx); + let reader = MetaReaders::table_snapshot_reader(ctx.clone()); reader.read(snapshot_location, None, format_version).await } @@ -49,7 +50,7 @@ pub async fn read_snapshots_by_root_file( root_snapshot_file: String, format_version: u64, data_accessor: &Operator, -) -> Result>>> { +) -> Result>> { let mut snapshot_files = vec![]; if let Some(path) = Path::new(&root_snapshot_file).parent() { let mut snapshot_prefix = path.to_str().unwrap_or("").to_string(); @@ -69,7 +70,10 @@ pub async fn read_snapshots_by_root_file( while let Some(de) = ds.try_next().await? { match de.mode() { ObjectMode::FILE => { - snapshot_files.push(de.path().to_string()); + let location = de.path().to_string(); + if location != root_snapshot_file { + snapshot_files.push(de.path().to_string()); + } } _ => { warn!( @@ -87,17 +91,18 @@ pub async fn read_snapshots_by_root_file( return Ok(vec![]); } + // 1. Get all the snapshot with parallel. let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; // 1.1 combine all the tasks. + let ctx_clone = ctx.clone(); let mut iter = snapshot_files.iter(); let tasks = std::iter::from_fn(move || { if let Some(location) = iter.next() { - let ctx = ctx.clone(); let location = location.clone(); Some( - read_snapshot(ctx, location, format_version) + read_snapshot(ctx_clone.clone(), location, format_version) .instrument(tracing::debug_span!("read_snapshot")), ) } else { @@ -122,5 +127,33 @@ pub async fn read_snapshots_by_root_file( .instrument(tracing::debug_span!("read_snapshots_join_all")) .await .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))?; - Ok(joint) + + // 2. Build the snapshot chain from root. + // 2.1 Build all the snapshot map, key is snapshot_id, value is the snapshot object. + let mut snapshot_map = HashMap::with_capacity(snapshot_files.len()); + for snapshot in joint.into_iter().flatten() { + snapshot_map.insert(snapshot.snapshot_id, snapshot); + } + + // 2.2 Get the root snapshot. + let root_snapshot = read_snapshot(ctx.clone(), root_snapshot_file, format_version).await?; + + // 2.3 Chain the snapshots from root to the oldest. + let mut snapshot_chain = Vec::with_capacity(snapshot_map.len()); + snapshot_chain.push(root_snapshot.clone()); + let mut prev_snapshot_id_tuple = root_snapshot.prev_snapshot_id; + while let Some((prev_snapshot_id, _)) = prev_snapshot_id_tuple { + let prev_snapshot = snapshot_map.get(&prev_snapshot_id); + match prev_snapshot { + None => { + break; + } + Some(prev_snapshot) => { + snapshot_chain.push(prev_snapshot.clone()); + prev_snapshot_id_tuple = prev_snapshot.prev_snapshot_id; + } + } + } + + Ok(snapshot_chain) } diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index 06e77614abbdc..a8d37819933c4 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -39,14 +39,13 @@ impl<'a> FuseSnapshot<'a> { let snapshot_location = self.table.snapshot_loc(); if let Some(snapshot_location) = snapshot_location { let snapshot_version = self.table.snapshot_format_version(); - let snapshots_results = read_snapshots_by_root_file( + let snapshots = read_snapshots_by_root_file( self.ctx.clone(), snapshot_location, snapshot_version, &self.table.operator, ) .await?; - let snapshots = snapshots_results.into_iter().flatten().collect(); return self.to_block(&meta_location_generator, snapshots, snapshot_version); } Ok(DataBlock::empty_with_schema(FuseSnapshot::schema())) From a5a9bb5754c4058325a2e300aff8c688da77be70 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 17:47:04 +0800 Subject: [PATCH 06/10] Make read_snapshots into chunks --- src/query/storages/fuse/src/fuse_snapshot.rs | 94 +++++++++++--------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index 71ac606cef788..b39f39b5820ab 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -40,6 +40,49 @@ async fn read_snapshot( reader.read(snapshot_location, None, format_version).await } +#[tracing::instrument(level = "debug", skip_all)] +pub async fn read_snapshots( + ctx: Arc, + snapshot_files: &[String], + format_version: u64, +) -> Result>>> { + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + + // 1.1 combine all the tasks. + let ctx_clone = ctx.clone(); + let mut iter = snapshot_files.iter(); + let tasks = std::iter::from_fn(move || { + if let Some(location) = iter.next() { + let location = location.clone(); + Some( + read_snapshot(ctx_clone.clone(), location, format_version) + .instrument(tracing::debug_span!("read_snapshot")), + ) + } else { + None + } + }); + + // 1.2 build the runtime. + let semaphore = Arc::new(Semaphore::new(max_io_requests)); + let segments_runtime = Arc::new(Runtime::with_worker_threads( + max_runtime_threads, + Some("fuse-req-snapshots-worker".to_owned()), + )?); + + // 1.3 spawn all the tasks to the runtime. + let join_handlers = segments_runtime + .try_spawn_batch(semaphore.clone(), tasks) + .await?; + + // 1.4 get all the result. + future::try_join_all(join_handlers) + .instrument(tracing::debug_span!("read_snapshots_join_all")) + .await + .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e))) +} + // Read all the snapshots by the root file: // 1. Get the prefix:'/db/table/_ss/' from the root_snapshot_file('/db/table/_ss/xx.json') // 2. List all the files in the prefix @@ -91,54 +134,21 @@ pub async fn read_snapshots_by_root_file( return Ok(vec![]); } - // 1. Get all the snapshot with parallel. - let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + // 1. Get all the snapshot by chunks. let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; - - // 1.1 combine all the tasks. - let ctx_clone = ctx.clone(); - let mut iter = snapshot_files.iter(); - let tasks = std::iter::from_fn(move || { - if let Some(location) = iter.next() { - let location = location.clone(); - Some( - read_snapshot(ctx_clone.clone(), location, format_version) - .instrument(tracing::debug_span!("read_snapshot")), - ) - } else { - None - } - }); - - // 1.2 build the runtime. - let semaphore = Arc::new(Semaphore::new(max_io_requests)); - let segments_runtime = Arc::new(Runtime::with_worker_threads( - max_runtime_threads, - Some("fuse-req-snapshots-worker".to_owned()), - )?); - - // 1.3 spawn all the tasks to the runtime. - let join_handlers = segments_runtime - .try_spawn_batch(semaphore.clone(), tasks) - .await?; - - // 1.4 get all the result. - let joint: Vec>> = future::try_join_all(join_handlers) - .instrument(tracing::debug_span!("read_snapshots_join_all")) - .await - .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))?; - - // 2. Build the snapshot chain from root. - // 2.1 Build all the snapshot map, key is snapshot_id, value is the snapshot object. let mut snapshot_map = HashMap::with_capacity(snapshot_files.len()); - for snapshot in joint.into_iter().flatten() { - snapshot_map.insert(snapshot.snapshot_id, snapshot); + for chunks in snapshot_files.chunks(max_io_requests) { + let results = read_snapshots(ctx.clone(), chunks, format_version).await?; + for snapshot in results.into_iter().flatten() { + snapshot_map.insert(snapshot.snapshot_id, snapshot); + } } - // 2.2 Get the root snapshot. + // 2. Build the snapshot chain from root. + // 2.1 Get the root snapshot. let root_snapshot = read_snapshot(ctx.clone(), root_snapshot_file, format_version).await?; - // 2.3 Chain the snapshots from root to the oldest. + // 2.2 Chain the snapshots from root to the oldest. let mut snapshot_chain = Vec::with_capacity(snapshot_map.len()); snapshot_chain.push(root_snapshot.clone()); let mut prev_snapshot_id_tuple = root_snapshot.prev_snapshot_id; From cc23d1a2cc628a4b217719cfab3ade528e063ed2 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 18:54:11 +0800 Subject: [PATCH 07/10] Add TableSnapshotLite for less memory --- .../fuse-meta/src/meta/current/mod.rs | 1 + .../storages/fuse-meta/src/meta/v1/mod.rs | 1 + .../fuse-meta/src/meta/v1/snapshot.rs | 36 +++++++++++++++++++ src/query/storages/fuse/src/fuse_snapshot.rs | 19 +++++----- .../fuse_snapshots/fuse_snapshot.rs | 14 ++++---- 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/query/storages/fuse-meta/src/meta/current/mod.rs b/src/query/storages/fuse-meta/src/meta/current/mod.rs index aa700f299da6b..c8088ba03616d 100644 --- a/src/query/storages/fuse-meta/src/meta/current/mod.rs +++ b/src/query/storages/fuse-meta/src/meta/current/mod.rs @@ -17,6 +17,7 @@ pub use v1::BlockFilter; pub use v1::BlockMeta; pub use v1::SegmentInfo; pub use v1::TableSnapshot; +pub use v1::TableSnapshotLite; use super::v0; use super::v1; diff --git a/src/query/storages/fuse-meta/src/meta/v1/mod.rs b/src/query/storages/fuse-meta/src/meta/v1/mod.rs index b6f2c7f7cce4f..1d9c2d03b597b 100644 --- a/src/query/storages/fuse-meta/src/meta/v1/mod.rs +++ b/src/query/storages/fuse-meta/src/meta/v1/mod.rs @@ -20,3 +20,4 @@ pub use index::BlockFilter; pub use segment::BlockMeta; pub use segment::SegmentInfo; pub use snapshot::TableSnapshot; +pub use snapshot::TableSnapshotLite; diff --git a/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs b/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs index 001e3c7e099d9..5641cb506d7f1 100644 --- a/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs +++ b/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs @@ -125,6 +125,42 @@ impl From for TableSnapshot { } } +// A memory light version of TableSnapshot(Without segments) +// This *ONLY* used for some optimize operation, like PURGE/FUSE_SNAPSHOT function to avoid OOM. +#[derive(Clone, Debug)] +pub struct TableSnapshotLite { + /// format version of snapshot + pub format_version: FormatVersion, + + /// id of snapshot + pub snapshot_id: SnapshotId, + + /// timestamp of this snapshot + // for backward compatibility, `Option` is used + pub timestamp: Option>, + + /// previous snapshot + pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, + + /// Summary Statistics + pub summary: Statistics, + + pub segment_count: usize, +} + +impl From<&TableSnapshot> for TableSnapshotLite { + fn from(value: &TableSnapshot) -> Self { + TableSnapshotLite { + format_version: value.format_version(), + snapshot_id: value.snapshot_id, + timestamp: value.timestamp, + prev_snapshot_id: value.prev_snapshot_id, + summary: value.summary.clone(), + segment_count: value.segments.len(), + } + } +} + mod util { use chrono::Datelike; use chrono::TimeZone; diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index b39f39b5820ab..c46df56d18edb 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -22,6 +22,7 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_fuse_meta::meta::TableSnapshot; +use common_fuse_meta::meta::TableSnapshotLite; use futures_util::future; use futures_util::TryStreamExt; use opendal::ObjectMode; @@ -88,12 +89,12 @@ pub async fn read_snapshots( // 2. List all the files in the prefix // 3. Try to read all the snapshot files in parallel. #[tracing::instrument(level = "debug", skip_all)] -pub async fn read_snapshots_by_root_file( +pub async fn read_snapshot_lites_by_root_file( ctx: Arc, root_snapshot_file: String, format_version: u64, data_accessor: &Operator, -) -> Result>> { +) -> Result> { let mut snapshot_files = vec![]; if let Some(path) = Path::new(&root_snapshot_file).parent() { let mut snapshot_prefix = path.to_str().unwrap_or("").to_string(); @@ -130,7 +131,7 @@ pub async fn read_snapshots_by_root_file( } // Only return if no snapshot files . - if !snapshot_files.is_empty() { + if snapshot_files.is_empty() { return Ok(vec![]); } @@ -140,21 +141,23 @@ pub async fn read_snapshots_by_root_file( for chunks in snapshot_files.chunks(max_io_requests) { let results = read_snapshots(ctx.clone(), chunks, format_version).await?; for snapshot in results.into_iter().flatten() { - snapshot_map.insert(snapshot.snapshot_id, snapshot); + let snapshot_lite = TableSnapshotLite::from(snapshot.as_ref()); + snapshot_map.insert(snapshot_lite.snapshot_id, snapshot_lite); } } // 2. Build the snapshot chain from root. // 2.1 Get the root snapshot. let root_snapshot = read_snapshot(ctx.clone(), root_snapshot_file, format_version).await?; + let root_snapshot_lite = TableSnapshotLite::from(root_snapshot.as_ref()); // 2.2 Chain the snapshots from root to the oldest. let mut snapshot_chain = Vec::with_capacity(snapshot_map.len()); - snapshot_chain.push(root_snapshot.clone()); - let mut prev_snapshot_id_tuple = root_snapshot.prev_snapshot_id; + snapshot_chain.push(root_snapshot_lite.clone()); + let mut prev_snapshot_id_tuple = root_snapshot_lite.prev_snapshot_id; while let Some((prev_snapshot_id, _)) = prev_snapshot_id_tuple { - let prev_snapshot = snapshot_map.get(&prev_snapshot_id); - match prev_snapshot { + let prev_snapshot_lite = snapshot_map.get(&prev_snapshot_id); + match prev_snapshot_lite { None => { break; } diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index a8d37819933c4..4fd8deef67402 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_fuse_meta::meta::TableSnapshot; +use common_fuse_meta::meta::TableSnapshotLite; -use crate::fuse_snapshot::read_snapshots_by_root_file; +use crate::fuse_snapshot::read_snapshot_lites_by_root_file; use crate::io::TableMetaLocationGenerator; use crate::sessions::TableContext; use crate::FuseTable; @@ -39,14 +39,14 @@ impl<'a> FuseSnapshot<'a> { let snapshot_location = self.table.snapshot_loc(); if let Some(snapshot_location) = snapshot_location { let snapshot_version = self.table.snapshot_format_version(); - let snapshots = read_snapshots_by_root_file( + let snapshots = read_snapshot_lites_by_root_file( self.ctx.clone(), snapshot_location, snapshot_version, &self.table.operator, ) .await?; - return self.to_block(&meta_location_generator, snapshots, snapshot_version); + return self.to_block(&meta_location_generator, &snapshots, snapshot_version); } Ok(DataBlock::empty_with_schema(FuseSnapshot::schema())) } @@ -54,7 +54,7 @@ impl<'a> FuseSnapshot<'a> { fn to_block( &self, location_generator: &TableMetaLocationGenerator, - snapshots: Vec>, + snapshots: &[TableSnapshotLite], latest_snapshot_version: u64, ) -> Result { let len = snapshots.len(); @@ -82,8 +82,8 @@ impl<'a> FuseSnapshot<'a> { None => (None, 0), }; prev_snapshot_ids.push(id); - format_versions.push(current_snapshot_version); - segment_count.push(s.segments.len() as u64); + format_versions.push(s.format_version); + segment_count.push(s.segment_count as u64); block_count.push(s.summary.block_count); row_count.push(s.summary.row_count); compressed.push(s.summary.compressed_byte_size); From 37e2e6aba32ef13616aba56f9498649e45d31010 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 19:37:38 +0800 Subject: [PATCH 08/10] Change the chunk size to 5x requests --- src/query/storages/fuse/src/fuse_snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index c46df56d18edb..2e7c53bc47063 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -138,7 +138,7 @@ pub async fn read_snapshot_lites_by_root_file( // 1. Get all the snapshot by chunks. let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; let mut snapshot_map = HashMap::with_capacity(snapshot_files.len()); - for chunks in snapshot_files.chunks(max_io_requests) { + for chunks in snapshot_files.chunks(max_io_requests * 5) { let results = read_snapshots(ctx.clone(), chunks, format_version).await?; for snapshot in results.into_iter().flatten() { let snapshot_lite = TableSnapshotLite::from(snapshot.as_ref()); From ccdbfd5d34445bda9844549fa10f93407d069849 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 20:54:50 +0800 Subject: [PATCH 09/10] Fix the last root snapshot --- src/query/storages/fuse/src/fuse_snapshot.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs index 963843781b90c..430de02d9a890 100644 --- a/src/query/storages/fuse/src/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -137,11 +137,6 @@ pub async fn read_snapshot_lites_by_root_file( } } - // Only return if no snapshot files . - if snapshot_files.is_empty() { - return Ok(vec![]); - } - // 1. Get all the snapshot by chunks. let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; let mut snapshot_map = HashMap::with_capacity(snapshot_files.len()); From 20585cd746add276e7d970c643cc0756c4447bf0 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 11 Oct 2022 21:16:38 +0800 Subject: [PATCH 10/10] Refine the unit test --- .../table_functions/fuse_snapshot_table.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs index 8e650c1dffd93..9ac35e0251f90 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs @@ -86,22 +86,22 @@ async fn test_fuse_snapshot_table_read() -> Result<()> { let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; interpreter.execute(ctx.clone()).await?; - // func args - let arg_db = LegacyExpression::create_literal(DataValue::String(db.as_bytes().to_vec())); - let arg_tbl = LegacyExpression::create_literal(DataValue::String(tbl.as_bytes().to_vec())); - { let expected = vec![ - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", - "| snapshot_id | snapshot_location | format_version | previous_snapshot_id | segment_count | block_count | row_count | bytes_uncompressed | bytes_compressed | index_size | timestamp |", - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", + "+-------+", + "| count |", + "+-------+", + "| 0 |", + "+-------+", ]; + let qry = format!( + "select count(1) as count from fuse_snapshot('{}', '{}')", + db, tbl + ); expects_ok( - "empty_data_set", - test_drive_with_args_and_ctx(Some(vec![arg_db.clone(), arg_tbl.clone()]), ctx.clone()) - .await, + "count_should_be_0", + execute_query(ctx.clone(), qry.as_str()).await, expected, ) .await?;