diff --git a/Cargo.lock b/Cargo.lock index f45fa8a3359f1..b7a2a9353eeec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2053,7 +2053,6 @@ dependencies = [ "common-storage", "common-storages-index", "common-storages-util", - "common-streams", "futures", "futures-util", "opendal", diff --git a/src/query/service/src/procedures/systems/fuse_snapshot.rs b/src/query/service/src/procedures/systems/fuse_snapshot.rs index c46ff8c8395e8..d68d5a2bee148 100644 --- a/src/query/service/src/procedures/systems/fuse_snapshot.rs +++ b/src/query/service/src/procedures/systems/fuse_snapshot.rs @@ -14,11 +14,12 @@ use std::sync::Arc; +use common_datablocks::DataBlock; use common_datavalues::DataSchema; use common_exception::Result; -use common_streams::SendableDataBlockStream; +use common_storages_fuse::FuseTable; -use crate::procedures::procedure::StreamProcedure; +use crate::procedures::OneBlockProcedure; use crate::procedures::Procedure; use crate::procedures::ProcedureFeatures; use crate::sessions::QueryContext; @@ -35,7 +36,7 @@ impl FuseSnapshotProcedure { } #[async_trait::async_trait] -impl StreamProcedure for FuseSnapshotProcedure { +impl OneBlockProcedure for FuseSnapshotProcedure { fn name(&self) -> &str { "FUSE_SNAPSHOT" } @@ -44,19 +45,12 @@ impl StreamProcedure for FuseSnapshotProcedure { ProcedureFeatures::default().variadic_arguments(2, 3) } - async fn data_stream( - &self, - ctx: Arc, - args: Vec, - ) -> Result { - let catalog_name = ctx.get_current_catalog(); - let tenant_id = ctx.get_tenant(); + async fn all_data(&self, ctx: Arc, args: Vec) -> Result { let database_name = args[0].clone(); let table_name = args[1].clone(); - - let limit = args.get(2).map(|arg| arg.parse::()).transpose()?; + let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&catalog_name)? + .get_catalog(&ctx.get_current_catalog())? .get_table( tenant_id.as_str(), database_name.as_str(), @@ -64,7 +58,9 @@ impl StreamProcedure for FuseSnapshotProcedure { ) .await?; - FuseSnapshot::new(ctx, tbl).get_history_stream_as_blocks(limit) + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + + Ok(FuseSnapshot::new(ctx, tbl).get_snapshots().await?) } fn schema(&self) -> Arc { diff --git a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs index 8e650c1dffd93..9ac35e0251f90 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs @@ -86,22 +86,22 @@ async fn test_fuse_snapshot_table_read() -> Result<()> { let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?; interpreter.execute(ctx.clone()).await?; - // func args - let arg_db = LegacyExpression::create_literal(DataValue::String(db.as_bytes().to_vec())); - let arg_tbl = LegacyExpression::create_literal(DataValue::String(tbl.as_bytes().to_vec())); - { let expected = vec![ - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", - "| snapshot_id | snapshot_location | format_version | previous_snapshot_id | segment_count | block_count | row_count | bytes_uncompressed | bytes_compressed | index_size | timestamp |", - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", - "+-------------+-------------------+----------------+----------------------+---------------+-------------+-----------+--------------------+------------------+------------+-----------+", + "+-------+", + "| count |", + "+-------+", + "| 0 |", + "+-------+", ]; + let qry = format!( + "select count(1) as count from fuse_snapshot('{}', '{}')", + db, tbl + ); expects_ok( - "empty_data_set", - test_drive_with_args_and_ctx(Some(vec![arg_db.clone(), arg_tbl.clone()]), ctx.clone()) - .await, + "count_should_be_0", + execute_query(ctx.clone(), qry.as_str()).await, expected, ) .await?; diff --git a/src/query/storages/fuse-meta/src/meta/current/mod.rs b/src/query/storages/fuse-meta/src/meta/current/mod.rs index aa700f299da6b..c8088ba03616d 100644 --- a/src/query/storages/fuse-meta/src/meta/current/mod.rs +++ b/src/query/storages/fuse-meta/src/meta/current/mod.rs @@ -17,6 +17,7 @@ pub use v1::BlockFilter; pub use v1::BlockMeta; pub use v1::SegmentInfo; pub use v1::TableSnapshot; +pub use v1::TableSnapshotLite; use super::v0; use super::v1; diff --git a/src/query/storages/fuse-meta/src/meta/v1/mod.rs b/src/query/storages/fuse-meta/src/meta/v1/mod.rs index b6f2c7f7cce4f..1d9c2d03b597b 100644 --- a/src/query/storages/fuse-meta/src/meta/v1/mod.rs +++ b/src/query/storages/fuse-meta/src/meta/v1/mod.rs @@ -20,3 +20,4 @@ pub use index::BlockFilter; pub use segment::BlockMeta; pub use segment::SegmentInfo; pub use snapshot::TableSnapshot; +pub use snapshot::TableSnapshotLite; diff --git a/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs b/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs index 001e3c7e099d9..5641cb506d7f1 100644 --- a/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs +++ b/src/query/storages/fuse-meta/src/meta/v1/snapshot.rs @@ -125,6 +125,42 @@ impl From for TableSnapshot { } } +// A memory light version of TableSnapshot(Without segments) +// This *ONLY* used for some optimize operation, like PURGE/FUSE_SNAPSHOT function to avoid OOM. +#[derive(Clone, Debug)] +pub struct TableSnapshotLite { + /// format version of snapshot + pub format_version: FormatVersion, + + /// id of snapshot + pub snapshot_id: SnapshotId, + + /// timestamp of this snapshot + // for backward compatibility, `Option` is used + pub timestamp: Option>, + + /// previous snapshot + pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, + + /// Summary Statistics + pub summary: Statistics, + + pub segment_count: usize, +} + +impl From<&TableSnapshot> for TableSnapshotLite { + fn from(value: &TableSnapshot) -> Self { + TableSnapshotLite { + format_version: value.format_version(), + snapshot_id: value.snapshot_id, + timestamp: value.timestamp, + prev_snapshot_id: value.prev_snapshot_id, + summary: value.summary.clone(), + segment_count: value.segments.len(), + } + } +} + mod util { use chrono::Datelike; use chrono::TimeZone; diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index d88bb65d1e8b6..9c19d3c58d2df 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -32,7 +32,6 @@ common-pipeline-transforms = { path = "../../pipeline/transforms" } common-storage = { path = "../../../common/storage" } common-storages-index = { path = "../index" } common-storages-util = { path = "../util" } -common-streams = { path = "../../streams" } async-trait = { version = "0.1.57", package = "async-trait-fn" } backoff = { version = "0.4.0", features = ["futures", "tokio"] } diff --git a/src/query/storages/fuse/src/fuse_snapshot.rs b/src/query/storages/fuse/src/fuse_snapshot.rs new file mode 100644 index 0000000000000..430de02d9a890 --- /dev/null +++ b/src/query/storages/fuse/src/fuse_snapshot.rs @@ -0,0 +1,181 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; + +use common_base::base::tokio::sync::Semaphore; +use common_base::base::Runtime; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_fuse_meta::meta::TableSnapshot; +use common_fuse_meta::meta::TableSnapshotLite; +use futures_util::future; +use futures_util::TryStreamExt; +use opendal::ObjectMode; +use opendal::Operator; +use tracing::warn; +use tracing::Instrument; + +use crate::io::MetaReaders; + +async fn read_snapshot( + ctx: Arc, + snapshot_location: String, + format_version: u64, + data_accessor: Operator, +) -> Result> { + let reader = MetaReaders::table_snapshot_reader(ctx.clone(), data_accessor); + reader.read(snapshot_location, None, format_version).await +} + +#[tracing::instrument(level = "debug", skip_all)] +pub async fn read_snapshots( + ctx: Arc, + snapshot_files: &[String], + format_version: u64, + data_accessor: Operator, +) -> Result>>> { + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + + // 1.1 combine all the tasks. + let ctx_clone = ctx.clone(); + let mut iter = snapshot_files.iter(); + let tasks = std::iter::from_fn(move || { + if let Some(location) = iter.next() { + let location = location.clone(); + Some( + read_snapshot( + ctx_clone.clone(), + location, + format_version, + data_accessor.clone(), + ) + .instrument(tracing::debug_span!("read_snapshot")), + ) + } else { + None + } + }); + + // 1.2 build the runtime. + let semaphore = Arc::new(Semaphore::new(max_io_requests)); + let segments_runtime = Arc::new(Runtime::with_worker_threads( + max_runtime_threads, + Some("fuse-req-snapshots-worker".to_owned()), + )?); + + // 1.3 spawn all the tasks to the runtime. + let join_handlers = segments_runtime + .try_spawn_batch(semaphore.clone(), tasks) + .await?; + + // 1.4 get all the result. + future::try_join_all(join_handlers) + .instrument(tracing::debug_span!("read_snapshots_join_all")) + .await + .map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e))) +} + +// Read all the snapshots by the root file: +// 1. Get the prefix:'/db/table/_ss/' from the root_snapshot_file('/db/table/_ss/xx.json') +// 2. List all the files in the prefix +// 3. Try to read all the snapshot files in parallel. +#[tracing::instrument(level = "debug", skip_all)] +pub async fn read_snapshot_lites_by_root_file( + ctx: Arc, + root_snapshot_file: String, + format_version: u64, + data_accessor: &Operator, +) -> Result> { + let mut snapshot_files = vec![]; + if let Some(path) = Path::new(&root_snapshot_file).parent() { + let mut snapshot_prefix = path.to_str().unwrap_or("").to_string(); + + // Check if the prefix:db/table/_ss is reasonable. + if !snapshot_prefix.contains('/') { + return Ok(vec![]); + } + + // Append '/' to the end if need. + if !snapshot_prefix.ends_with('/') { + snapshot_prefix += "/"; + } + + // List the prefix path to get all the snapshot files list. + let mut ds = data_accessor.object(&snapshot_prefix).list().await?; + while let Some(de) = ds.try_next().await? { + match de.mode() { + ObjectMode::FILE => { + let location = de.path().to_string(); + if location != root_snapshot_file { + snapshot_files.push(de.path().to_string()); + } + } + _ => { + warn!( + "Found not snapshot file in {:}, found: {:?}", + snapshot_prefix, de + ); + continue; + } + } + } + } + + // 1. Get all the snapshot by chunks. + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + let mut snapshot_map = HashMap::with_capacity(snapshot_files.len()); + for chunks in snapshot_files.chunks(max_io_requests * 5) { + let results = + read_snapshots(ctx.clone(), chunks, format_version, data_accessor.clone()).await?; + for snapshot in results.into_iter().flatten() { + let snapshot_lite = TableSnapshotLite::from(snapshot.as_ref()); + snapshot_map.insert(snapshot_lite.snapshot_id, snapshot_lite); + } + } + + // 2. Build the snapshot chain from root. + // 2.1 Get the root snapshot. + let root_snapshot = read_snapshot( + ctx.clone(), + root_snapshot_file, + format_version, + data_accessor.clone(), + ) + .await?; + let root_snapshot_lite = TableSnapshotLite::from(root_snapshot.as_ref()); + + // 2.2 Chain the snapshots from root to the oldest. + let mut snapshot_chain = Vec::with_capacity(snapshot_map.len()); + snapshot_chain.push(root_snapshot_lite.clone()); + let mut prev_snapshot_id_tuple = root_snapshot_lite.prev_snapshot_id; + while let Some((prev_snapshot_id, _)) = prev_snapshot_id_tuple { + let prev_snapshot_lite = snapshot_map.get(&prev_snapshot_id); + match prev_snapshot_lite { + None => { + break; + } + Some(prev_snapshot) => { + snapshot_chain.push(prev_snapshot.clone()); + prev_snapshot_id_tuple = prev_snapshot.prev_snapshot_id; + } + } + } + + Ok(snapshot_chain) +} diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index f9f3b7dd9bda6..de4a29b5a0d72 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -20,6 +20,7 @@ mod constants; mod fuse_lazy_part; mod fuse_part; mod fuse_segment; +mod fuse_snapshot; mod fuse_table; pub mod io; pub mod operations; diff --git a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs index f212902f2b849..9b3baf7032f2c 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs @@ -119,7 +119,7 @@ impl Table for FuseSegmentTable { pipeline.add_pipe(Pipe::SimplePipe { inputs_port: vec![], outputs_port: vec![output.clone()], - processors: vec![FuseHistorySource::create( + processors: vec![FuseSegmentSource::create( ctx, output, self.arg_database_name.to_owned(), @@ -132,7 +132,7 @@ impl Table for FuseSegmentTable { } } -struct FuseHistorySource { +struct FuseSegmentSource { finish: bool, ctx: Arc, arg_database_name: String, @@ -140,7 +140,7 @@ struct FuseHistorySource { arg_snapshot_id: String, } -impl FuseHistorySource { +impl FuseSegmentSource { pub fn create( ctx: Arc, output: Arc, @@ -148,7 +148,7 @@ impl FuseHistorySource { arg_table_name: String, arg_snapshot_id: String, ) -> Result { - AsyncSourcer::create(ctx.clone(), output, FuseHistorySource { + AsyncSourcer::create(ctx.clone(), output, FuseSegmentSource { ctx, finish: false, arg_table_name, @@ -159,8 +159,8 @@ impl FuseHistorySource { } #[async_trait::async_trait] -impl AsyncSource for FuseHistorySource { - const NAME: &'static str = "fuse_snapshot"; +impl AsyncSource for FuseSegmentSource { + const NAME: &'static str = "fuse_segment"; #[async_trait::unboxed_simple] async fn generate(&mut self) -> Result> { diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index 892b3571f2907..4fd8deef67402 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -14,116 +14,47 @@ use std::sync::Arc; -use common_catalog::table::Table; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_fuse_meta::meta::TableSnapshot; -use common_streams::DataBlockStream; -use common_streams::SendableDataBlockStream; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; +use common_fuse_meta::meta::TableSnapshotLite; -use crate::io::MetaReaders; -use crate::io::SnapshotHistoryReader; +use crate::fuse_snapshot::read_snapshot_lites_by_root_file; use crate::io::TableMetaLocationGenerator; use crate::sessions::TableContext; use crate::FuseTable; -pub struct FuseSnapshot { +pub struct FuseSnapshot<'a> { pub ctx: Arc, - pub table: Arc, + pub table: &'a FuseTable, } -impl FuseSnapshot { - pub fn new(ctx: Arc, table: Arc) -> Self { +impl<'a> FuseSnapshot<'a> { + pub fn new(ctx: Arc, table: &'a FuseTable) -> Self { Self { ctx, table } } - /// Get table snapshot history as stream of data block - /// For cases that caller inside sync context, i.e. using async catalog api is not convenient - pub fn new_snapshot_history_stream( - ctx: Arc, - database_name: String, - table_name: String, - catalog_name: String, - limit: Option, - ) -> SendableDataBlockStream { - // prepare the future that resolved the table - let table_instance = { - let ctx = ctx.clone(); - async move { - let tenant_id = ctx.get_tenant(); - let tbl = ctx - .get_catalog(catalog_name.as_str())? - .get_table( - tenant_id.as_str(), - database_name.as_str(), - table_name.as_str(), - ) - .await?; - Ok(tbl) - } - }; - - // chain the future with a stream of snapshot, for the resolved table, as data blocks - let resolved_table = futures::stream::once(table_instance); - let stream = resolved_table.map(move |item: Result<_>| match item { - Ok(table) => { - let fuse_snapshot = FuseSnapshot::new(ctx.clone(), table); - Ok(fuse_snapshot.get_history_stream_as_blocks(limit)?) - } - Err(e) => Err(e), - }); - - // flat it into single stream of data blocks - stream.try_flatten().boxed() - } - - pub fn get_history_stream_as_blocks( - self, - limit: Option, - ) -> Result { - let tbl = FuseTable::try_from_table(self.table.as_ref())?; - let snapshot_location = tbl.snapshot_loc(); - let meta_location_generator = tbl.meta_location_generator.clone(); + pub async fn get_snapshots(self) -> Result { + let meta_location_generator = self.table.meta_location_generator.clone(); + let snapshot_location = self.table.snapshot_loc(); if let Some(snapshot_location) = snapshot_location { - let snapshot_version = tbl.snapshot_format_version(); - let snapshot_reader = - MetaReaders::table_snapshot_reader(self.ctx.clone(), tbl.get_operator()); - let snapshot_stream = snapshot_reader.snapshot_history( + let snapshot_version = self.table.snapshot_format_version(); + let snapshots = read_snapshot_lites_by_root_file( + self.ctx.clone(), snapshot_location, snapshot_version, - tbl.meta_location_generator().clone(), - ); - - // map snapshot to data block - let block_stream = snapshot_stream.map(move |snapshot| match snapshot { - Ok(snapshot) => Self::snapshots_to_block( - &meta_location_generator, - vec![snapshot], - snapshot_version, - ), - Err(e) => Err(e), - }); - - // limit if necessary - if let Some(limit) = limit { - Ok(block_stream.take(limit).boxed()) - } else { - Ok(block_stream.boxed()) - } - } else { - // to carries the schema info back to caller, instead of an empty stream, - // a stream of single empty block item returned - let data_block = DataBlock::empty_with_schema(FuseSnapshot::schema()); - Ok(DataBlockStream::create(FuseSnapshot::schema(), None, vec![data_block]).boxed()) + &self.table.operator, + ) + .await?; + return self.to_block(&meta_location_generator, &snapshots, snapshot_version); } + Ok(DataBlock::empty_with_schema(FuseSnapshot::schema())) } - fn snapshots_to_block( + fn to_block( + &self, location_generator: &TableMetaLocationGenerator, - snapshots: Vec>, + snapshots: &[TableSnapshotLite], latest_snapshot_version: u64, ) -> Result { let len = snapshots.len(); @@ -151,8 +82,8 @@ impl FuseSnapshot { None => (None, 0), }; prev_snapshot_ids.push(id); - format_versions.push(current_snapshot_version); - segment_count.push(s.segments.len() as u64); + format_versions.push(s.format_version); + segment_count.push(s.segment_count as u64); block_count.push(s.summary.block_count); row_count.push(s.summary.row_count); compressed.push(s.summary.compressed_byte_size); diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs index e4655f6a284c3..763378eb54a11 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::sync::Arc; use common_catalog::catalog::CATALOG_DEFAULT; +use common_datablocks::DataBlock; use common_exception::Result; use common_legacy_expression::LegacyExpression; use common_legacy_planners::Extras; @@ -25,7 +26,9 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; -use common_pipeline_sources::processors::sources::StreamSourceNoSkipEmpty; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_sources::processors::sources::AsyncSource; +use common_pipeline_sources::processors::sources::AsyncSourcer; use super::fuse_snapshot::FuseSnapshot; use super::table_args::parse_func_history_args; @@ -36,6 +39,7 @@ use crate::sessions::TableContext; use crate::table_functions::string_literal; use crate::table_functions::TableArgs; use crate::table_functions::TableFunction; +use crate::FuseTable; use crate::Table; const FUSE_FUNC_SNAPSHOT: &str = "fuse_snapshot"; @@ -74,10 +78,6 @@ impl FuseSnapshotTable { arg_table_name, })) } - - fn get_limit(plan: &ReadDataSourcePlan) -> Option { - plan.push_downs.as_ref().and_then(|extras| extras.limit) - } } #[async_trait::async_trait] @@ -108,29 +108,19 @@ impl Table for FuseSnapshotTable { fn read_data( &self, ctx: Arc, - plan: &ReadDataSourcePlan, + _: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { let output = OutputPort::create(); - let limit = Self::get_limit(plan); - let database_name = self.arg_database_name.to_owned(); - let table_name = self.arg_table_name.to_owned(); - let catalog_name = CATALOG_DEFAULT.to_owned(); - let snapshot_stream = FuseSnapshot::new_snapshot_history_stream( - ctx.clone(), - database_name, - table_name, - catalog_name, - limit, - ); - - // the underlying stream may returns a single empty block, which carries the schema - let source = StreamSourceNoSkipEmpty::create(ctx, Some(snapshot_stream), output.clone())?; - pipeline.add_pipe(Pipe::SimplePipe { inputs_port: vec![], - outputs_port: vec![output], - processors: vec![source], + outputs_port: vec![output.clone()], + processors: vec![FuseSnapshotSource::create( + ctx, + output, + self.arg_database_name.to_owned(), + self.arg_table_name.to_owned(), + )?], }); Ok(()) @@ -147,3 +137,57 @@ impl TableFunction for FuseSnapshotTable { self } } + +struct FuseSnapshotSource { + finish: bool, + ctx: Arc, + arg_database_name: String, + arg_table_name: String, +} + +impl FuseSnapshotSource { + pub fn create( + ctx: Arc, + output: Arc, + arg_database_name: String, + arg_table_name: String, + ) -> Result { + AsyncSourcer::create(ctx.clone(), output, FuseSnapshotSource { + ctx, + finish: false, + arg_table_name, + arg_database_name, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for FuseSnapshotSource { + const NAME: &'static str = "fuse_snapshot"; + + #[async_trait::unboxed_simple] + async fn generate(&mut self) -> Result> { + if self.finish { + return Ok(None); + } + + self.finish = true; + let tenant_id = self.ctx.get_tenant(); + let tbl = self + .ctx + .get_catalog(CATALOG_DEFAULT)? + .get_table( + tenant_id.as_str(), + self.arg_database_name.as_str(), + self.arg_table_name.as_str(), + ) + .await?; + + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + Ok(Some( + FuseSnapshot::new(self.ctx.clone(), tbl) + .get_snapshots() + .await?, + )) + } +}