Skip to content

feat: keep a hint of last snapshot location while committing new snapshot #7418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 2, 2022
40 changes: 40 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::base::tokio;
use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_exception::Result;
use common_storages_fuse::FuseTable;
use futures::TryStreamExt;

use crate::storages::fuse::table_test_fixture::execute_query;
Expand Down Expand Up @@ -76,3 +78,41 @@ async fn test_fuse_occ_retry() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_last_snapshot_hint() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
fixture.create_default_table().await?;

let table = fixture.latest_default_table().await?;

let num_blocks = 1;
let rows_per_block = 1;
let value_start_from = 1;
let stream =
TestFixture::gen_sample_blocks_stream_ex(num_blocks, rows_per_block, value_start_from);

let blocks = stream.try_collect().await?;
fixture
.append_commit_blocks(table.clone(), blocks, false, true)
.await?;

// check last snapshot hit file
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let last_snapshot_location = fuse_table.snapshot_loc().unwrap();
let operator = ctx.get_storage_operator()?;
let location = fuse_table
.meta_location_generator()
.gen_last_snapshot_hint_location();
let storage_meta_data = operator.metadata();
let storage_prefix = storage_meta_data.root();

let expected = format!("{}{}", storage_prefix, last_snapshot_location);
let content = operator.object(location.as_str()).read().await?;

assert_eq!(content.as_slice(), expected.as_bytes());

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
}
}

let (new_snapshot, _) = mutator.into_new_snapshot().await?;
let new_snapshot = mutator.into_new_snapshot().await?;

// half segments left after deletion
assert_eq!(new_snapshot.segments.len(), 50);
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse-meta/src/meta/versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::meta::Versioned;
// impl Versioned<0> for v0::SegmentInfo {}
// impl Versioned<1> for v0::SegmentInfo {}
//
// Fortunately, since v0::SegmentInfo::VESION is used in
// Fortunately, since v0::SegmentInfo::VERSION is used in
// several places, compiler will report compile error if it
// can not deduce a unique value the constant expression.

Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
pub const FUSE_TBL_SEGMENT_PREFIX: &str = "_sg";
pub const FUSE_TBL_SNAPSHOT_PREFIX: &str = "_ss";
pub const FUSE_TBL_LAST_SNAPSHOT_HINT: &str = "last_snapshot_location_hint";

pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000;
pub const DEFAULT_BLOCK_SIZE_IN_MEM_SIZE_THRESHOLD: usize = 100 * 1024 * 1024;
Expand Down
73 changes: 14 additions & 59 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@ use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use common_cache::Cache;
use common_catalog::catalog::StorageDescription;
use common_catalog::table_context::TableContext;
use common_catalog::table_mutator::TableMutator;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_fuse_meta::caches::CacheManager;
use common_fuse_meta::meta::ClusterKey;
use common_fuse_meta::meta::Statistics as FuseStatistics;
use common_fuse_meta::meta::TableSnapshot;
use common_fuse_meta::meta::Versioned;
use common_legacy_parser::ExpressionParser;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use common_meta_app::schema::UpdateTableMetaReq;
use common_meta_types::MatchSeq;
use common_planners::DeletePlan;
use common_planners::Expression;
use common_planners::Extras;
Expand All @@ -43,7 +38,6 @@ use common_planners::TruncateTablePlan;
use common_storages_util::storage_context::StorageContext;
use uuid::Uuid;

use crate::io::write_meta;
use crate::io::BlockCompactor;
use crate::io::MetaReaders;
use crate::io::TableMetaLocationGenerator;
Expand Down Expand Up @@ -174,53 +168,6 @@ impl FuseTable {
}
}

pub async fn update_table_meta(
&self,
ctx: &dyn TableContext,
catalog_name: &str,
snapshot: &TableSnapshot,
meta: &mut TableMeta,
) -> Result<()> {
let uuid = snapshot.snapshot_id;
let snapshot_loc = self
.meta_location_generator()
.snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
let operator = ctx.get_storage_operator()?;
write_meta(&operator, &snapshot_loc, snapshot).await?;

// set new snapshot location
meta.options
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone());
// remove legacy options
meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);

let table_id = self.table_info.ident.table_id;
let table_version = self.table_info.ident.seq;
let req = UpdateTableMetaReq {
table_id,
seq: MatchSeq::Exact(table_version),
new_table_meta: meta.clone(),
};

let catalog = ctx.get_catalog(catalog_name)?;
let result = catalog.update_table_meta(req).await;
match result {
Ok(_) => {
if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
let cache = &mut snapshot_cache.write().await;
cache.put(snapshot_loc, Arc::new(snapshot.clone()));
}
Ok(())
}
Err(e) => {
// commit snapshot to meta server failed, try to delete it.
// "major GC" will collect this, if deletion failure (even after DAL retried)
let _ = operator.object(&snapshot_loc).delete().await;
Err(e)
}
}
}

pub fn transient(&self) -> bool {
self.table_info.meta.options.contains_key("TRANSIENT")
}
Expand Down Expand Up @@ -297,11 +244,15 @@ impl Table for FuseTable {
cluster_key_meta,
);

self.update_table_meta(
let mut table_info = self.table_info.clone();
table_info.meta = new_table_meta;

FuseTable::commit_to_meta_server(
ctx.as_ref(),
catalog_name,
&new_snapshot,
&mut new_table_meta,
&table_info,
&self.meta_location_generator,
new_snapshot,
)
.await
}
Expand Down Expand Up @@ -341,11 +292,15 @@ impl Table for FuseTable {
None,
);

self.update_table_meta(
let mut table_info = self.table_info.clone();
table_info.meta = new_table_meta;

FuseTable::commit_to_meta_server(
ctx.as_ref(),
catalog_name,
&new_snapshot,
&mut new_table_meta,
&table_info,
&self.meta_location_generator,
new_snapshot,
)
.await
}
Expand Down
7 changes: 6 additions & 1 deletion src/query/storages/fuse/src/io/locations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::constants::FUSE_TBL_BLOCK_PREFIX;
use crate::constants::FUSE_TBL_SEGMENT_PREFIX;
use crate::constants::FUSE_TBL_SNAPSHOT_PREFIX;
use crate::FUSE_TBL_BLOCK_INDEX_PREFIX;
use crate::FUSE_TBL_LAST_SNAPSHOT_HINT;

static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData);
static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData);
Expand Down Expand Up @@ -75,7 +76,7 @@ impl TableMetaLocationGenerator {
)
}

pub fn gen_segment_info_location(&self) -> String where {
pub fn gen_segment_info_location(&self) -> String {
let segment_uuid = Uuid::new_v4().simple().to_string();
format!(
"{}/{}/{}_v{}.json",
Expand All @@ -98,6 +99,10 @@ impl TableMetaLocationGenerator {
SNAPSHOT_V0.version()
}
}

pub fn gen_last_snapshot_hint_location(&self) -> String {
format!("{}/{}", &self.prefix, FUSE_TBL_LAST_SNAPSHOT_HINT)
}
}

trait SnapshotLocationCreator {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/io/write/meta_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::warn;
use crate::io::retry;
use crate::io::retry::Retryable;

pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: &T) -> Result<()>
pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: T) -> Result<()>
where T: Serialize {
let op = || async {
let bs = serde_json::to_vec(&meta).map_err(Error::other)?;
Expand Down
Loading