Skip to content

chore: try to fast read_snapshot #8098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 11, 2022
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 10 additions & 14 deletions src/query/service/src/procedures/systems/fuse_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::Result;
use common_streams::SendableDataBlockStream;
use common_storages_fuse::FuseTable;

use crate::procedures::procedure::StreamProcedure;
use crate::procedures::OneBlockProcedure;
use crate::procedures::Procedure;
use crate::procedures::ProcedureFeatures;
use crate::sessions::QueryContext;
Expand All @@ -35,7 +36,7 @@ impl FuseSnapshotProcedure {
}

#[async_trait::async_trait]
impl StreamProcedure for FuseSnapshotProcedure {
impl OneBlockProcedure for FuseSnapshotProcedure {
fn name(&self) -> &str {
"FUSE_SNAPSHOT"
}
Expand All @@ -44,27 +45,22 @@ impl StreamProcedure for FuseSnapshotProcedure {
ProcedureFeatures::default().variadic_arguments(2, 3)
}

async fn data_stream(
&self,
ctx: Arc<QueryContext>,
args: Vec<String>,
) -> Result<SendableDataBlockStream> {
let catalog_name = ctx.get_current_catalog();
let tenant_id = ctx.get_tenant();
async fn all_data(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
let database_name = args[0].clone();
let table_name = args[1].clone();

let limit = args.get(2).map(|arg| arg.parse::<usize>()).transpose()?;
let tenant_id = ctx.get_tenant();
let tbl = ctx
.get_catalog(&catalog_name)?
.get_catalog(&ctx.get_current_catalog())?
.get_table(
tenant_id.as_str(),
database_name.as_str(),
table_name.as_str(),
)
.await?;

FuseSnapshot::new(ctx, tbl).get_history_stream_as_blocks(limit)
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

Ok(FuseSnapshot::new(ctx, tbl).get_snapshots().await?)
}

fn schema(&self) -> Arc<DataSchema> {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ common-pipeline-transforms = { path = "../../pipeline/transforms" }
common-storage = { path = "../../../common/storage" }
common-storages-index = { path = "../index" }
common-storages-util = { path = "../util" }
common-streams = { path = "../../streams" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down
108 changes: 108 additions & 0 deletions src/query/storages/fuse/src/fuse_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;
use std::sync::Arc;

use common_base::base::tokio::sync::Semaphore;
use common_base::base::Runtime;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_fuse_meta::meta::TableSnapshot;
use futures_util::future;
use futures_util::TryStreamExt;
use opendal::ObjectMode;
use opendal::Operator;
use tracing::warn;
use tracing::Instrument;

use crate::io::MetaReaders;

async fn read_snapshot(
ctx: Arc<dyn TableContext>,
snapshot_location: String,
format_version: u64,
) -> Result<Arc<TableSnapshot>> {
let reader = MetaReaders::table_snapshot_reader(ctx);
reader.read(snapshot_location, None, format_version).await
}

#[tracing::instrument(level = "debug", skip_all)]
pub async fn read_snapshots_by_root_file(
ctx: Arc<dyn TableContext>,
root_snapshot_file: String,
format_version: u64,
data_accessor: &Operator,
) -> Result<Vec<Result<Arc<TableSnapshot>>>> {
let mut snapshot_files = vec![];
if let Some(path) = Path::new(&root_snapshot_file).parent() {
let snapshot_prefix = path.to_str().unwrap_or("");
if snapshot_prefix.contains('/') {
let mut ds = data_accessor.object(snapshot_prefix).list().await?;
// ObjectStreamer implements `futures::Stream`
while let Some(de) = ds.try_next().await? {
match de.mode() {
ObjectMode::FILE => {
snapshot_files.push(de.path().to_string());
}
_ => {
warn!(
"find not snapshot file in {:}, found: {:?}",
snapshot_prefix, de
);
continue;
}
}
}
}
}

let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;

// 1.1 combine all the tasks.
let mut iter = snapshot_files.iter();
let tasks = std::iter::from_fn(move || {
if let Some(location) = iter.next() {
let ctx = ctx.clone();
let location = location.clone();
Some(
read_snapshot(ctx, location, format_version)
.instrument(tracing::debug_span!("read_snapshot")),
)
} else {
None
}
});

// 1.2 build the runtime.
let semaphore = Arc::new(Semaphore::new(max_io_requests));
let segments_runtime = Arc::new(Runtime::with_worker_threads(
max_runtime_threads,
Some("fuse-req-snapshots-worker".to_owned()),
)?);

// 1.3 spawn all the tasks to the runtime.
let join_handlers = segments_runtime
.try_spawn_batch(semaphore.clone(), tasks)
.await?;

// 1.4 get all the result.
let joint: Vec<Result<Arc<TableSnapshot>>> = future::try_join_all(join_handlers)
.instrument(tracing::debug_span!("read_snapshots_join_all"))
.await
.map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))?;
Ok(joint)
}
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod constants;
mod fuse_lazy_part;
mod fuse_part;
mod fuse_segment;
mod fuse_snapshot;
mod fuse_table;
pub mod io;
pub mod operations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl FuseHistorySource {

#[async_trait::async_trait]
impl AsyncSource for FuseHistorySource {
const NAME: &'static str = "fuse_snapshot";
const NAME: &'static str = "fuse_segment";

#[async_trait::unboxed_simple]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,113 +14,46 @@

use std::sync::Arc;

use common_catalog::table::Table;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::Result;
use common_fuse_meta::meta::TableSnapshot;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;

use crate::io::MetaReaders;
use crate::io::SnapshotHistoryReader;
use crate::fuse_snapshot::read_snapshots_by_root_file;
use crate::io::TableMetaLocationGenerator;
use crate::sessions::TableContext;
use crate::FuseTable;

pub struct FuseSnapshot {
pub struct FuseSnapshot<'a> {
pub ctx: Arc<dyn TableContext>,
pub table: Arc<dyn Table>,
pub table: &'a FuseTable,
}

impl FuseSnapshot {
pub fn new(ctx: Arc<dyn TableContext>, table: Arc<dyn Table>) -> Self {
impl<'a> FuseSnapshot<'a> {
pub fn new(ctx: Arc<dyn TableContext>, table: &'a FuseTable) -> Self {
Self { ctx, table }
}

/// Get table snapshot history as stream of data block
/// For cases that caller inside sync context, i.e. using async catalog api is not convenient
pub fn new_snapshot_history_stream(
ctx: Arc<dyn TableContext>,
database_name: String,
table_name: String,
catalog_name: String,
limit: Option<usize>,
) -> SendableDataBlockStream {
// prepare the future that resolved the table
let table_instance = {
let ctx = ctx.clone();
async move {
let tenant_id = ctx.get_tenant();
let tbl = ctx
.get_catalog(catalog_name.as_str())?
.get_table(
tenant_id.as_str(),
database_name.as_str(),
table_name.as_str(),
)
.await?;
Ok(tbl)
}
};

// chain the future with a stream of snapshot, for the resolved table, as data blocks
let resolved_table = futures::stream::once(table_instance);
let stream = resolved_table.map(move |item: Result<_>| match item {
Ok(table) => {
let fuse_snapshot = FuseSnapshot::new(ctx.clone(), table);
Ok(fuse_snapshot.get_history_stream_as_blocks(limit)?)
}
Err(e) => Err(e),
});

// flat it into single stream of data blocks
stream.try_flatten().boxed()
}

pub fn get_history_stream_as_blocks(
self,
limit: Option<usize>,
) -> Result<SendableDataBlockStream> {
let tbl = FuseTable::try_from_table(self.table.as_ref())?;
let snapshot_location = tbl.snapshot_loc();
let meta_location_generator = tbl.meta_location_generator.clone();
pub async fn get_snapshots(self) -> Result<DataBlock> {
let meta_location_generator = self.table.meta_location_generator.clone();
let snapshot_location = self.table.snapshot_loc();
if let Some(snapshot_location) = snapshot_location {
let snapshot_version = tbl.snapshot_format_version();
let snapshot_reader = MetaReaders::table_snapshot_reader(self.ctx.clone());
let snapshot_stream = snapshot_reader.snapshot_history(
let snapshot_version = self.table.snapshot_format_version();
let snapshots_results = read_snapshots_by_root_file(
self.ctx.clone(),
snapshot_location,
snapshot_version,
tbl.meta_location_generator().clone(),
);

// map snapshot to data block
let block_stream = snapshot_stream.map(move |snapshot| match snapshot {
Ok(snapshot) => Self::snapshots_to_block(
&meta_location_generator,
vec![snapshot],
snapshot_version,
),
Err(e) => Err(e),
});

// limit if necessary
if let Some(limit) = limit {
Ok(block_stream.take(limit).boxed())
} else {
Ok(block_stream.boxed())
}
} else {
// to carries the schema info back to caller, instead of an empty stream,
// a stream of single empty block item returned
let data_block = DataBlock::empty_with_schema(FuseSnapshot::schema());
Ok(DataBlockStream::create(FuseSnapshot::schema(), None, vec![data_block]).boxed())
&self.table.operator,
)
.await?;
let snapshots = snapshots_results.into_iter().flatten().collect();
return self.to_block(&meta_location_generator, snapshots, snapshot_version);
}
Ok(DataBlock::empty_with_schema(FuseSnapshot::schema()))
}

fn snapshots_to_block(
fn to_block(
&self,
location_generator: &TableMetaLocationGenerator,
snapshots: Vec<Arc<TableSnapshot>>,
latest_snapshot_version: u64,
Expand Down
Loading